You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2021/12/25 12:25:08 UTC
[dubbo] branch 3.0.5-release updated: fix async call issues, rpc context and response future callback race conditions (#9464)
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.0.5-release
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0.5-release by this push:
new 098787a fix async call issues, rpc context and response future callback race conditions (#9464)
098787a is described below
commit 098787a4ef95c53f43f2130c90995cbf4c8e732f
Author: ken.lj <ke...@gmail.com>
AuthorDate: Sat Dec 25 20:24:22 2021 +0800
fix async call issues, rpc context and response future callback race conditions (#9464)
fixes #9461 #8602
---
.../cluster/filter/DefaultFilterChainBuilder.java | 6 +-
.../rpc/cluster/filter/FilterChainBuilder.java | 204 ++++++++++++++++++++-
.../filter/DefaultFilterChainBuilderTest.java | 12 +-
.../support/FailoverClusterInvokerTest.java | 2 +-
.../support/MergeableClusterInvokerTest.java | 7 +-
.../support/wrapper/AbstractClusterTest.java | 4 +-
.../java/org/apache/dubbo/common/Experimental.java | 2 +-
.../dubbo/common/constants/CommonConstants.java | 5 +
.../AbstractRegistryCenterExporterListener.java | 36 +++-
.../dubbo/auth/filter/ProviderAuthFilterTest.java | 16 +-
.../java/org/apache/dubbo/rpc/AsyncContext.java | 46 +++++
.../org/apache/dubbo/rpc/AsyncContextImpl.java | 25 ++-
.../java/org/apache/dubbo/rpc/AsyncRpcResult.java | 44 ++---
.../main/java/org/apache/dubbo/rpc/BaseFilter.java | 23 +++
.../org/apache/dubbo/rpc/ListenableFilter.java | 3 +-
.../main/java/org/apache/dubbo/rpc/RpcContext.java | 74 +++++++-
.../org/apache/dubbo/rpc/RpcContextAttachment.java | 24 +++
.../org/apache/dubbo/rpc/RpcServiceContext.java | 41 ++++-
...rFilter.java => ClassLoaderCallbackFilter.java} | 33 +++-
.../apache/dubbo/rpc/filter/ClassLoaderFilter.java | 32 +++-
.../org/apache/dubbo/rpc/filter/ContextFilter.java | 2 +-
.../dubbo/rpc/proxy/AbstractProxyInvoker.java | 6 +-
.../dubbo/internal/org.apache.dubbo.rpc.Filter | 3 +-
.../java/org/apache/dubbo/rpc/RpcContextTest.java | 5 +
.../rpc/filter/CompatibleFilterFilterTest.java | 19 +-
.../apache/dubbo/rpc/filter/EchoFilterTest.java | 3 +-
.../apache/dubbo/rpc/filter/TimeoutFilterTest.java | 3 +-
.../apache/dubbo/rpc/support/MockInvocation.java | 4 +-
28 files changed, 576 insertions(+), 108 deletions(-)
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilder.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilder.java
index ff4f9b8..15be300 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilder.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilder.java
@@ -67,8 +67,9 @@ public class DefaultFilterChainBuilder implements FilterChainBuilder {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
- last = new FilterChainNode<>(originalInvoker, next, filter);
+ last = new CopyOfFilterChainNode<>(originalInvoker, next, filter);
}
+ return new CallbackRegistrationInvoker<>(last, filters);
}
return last;
@@ -103,8 +104,9 @@ public class DefaultFilterChainBuilder implements FilterChainBuilder {
for (int i = filters.size() - 1; i >= 0; i--) {
final ClusterFilter filter = filters.get(i);
final Invoker<T> next = last;
- last = new ClusterFilterChainNode<>(originalInvoker, next, filter);
+ last = new CopyOfClusterFilterChainNode<>(originalInvoker, next, filter);
}
+ return new ClusterCallbackRegistrationInvoker<>(originalInvoker, last, filters);
}
return last;
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/FilterChainBuilder.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/FilterChainBuilder.java
index 7edab08..a329959 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/FilterChainBuilder.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/FilterChainBuilder.java
@@ -16,8 +16,11 @@
*/
package org.apache.dubbo.rpc.cluster.filter;
+import org.apache.dubbo.common.Experimental;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.SPI;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.rpc.BaseFilter;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
@@ -28,6 +31,9 @@ import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.ClusterInvoker;
import org.apache.dubbo.rpc.cluster.Directory;
+import java.util.List;
+import java.util.stream.Collectors;
+
import static org.apache.dubbo.common.extension.ExtensionScope.APPLICATION;
@SPI(value = "default", scope = APPLICATION)
@@ -144,7 +150,7 @@ public interface FilterChainBuilder {
* @param <TYPE>
*/
class ClusterFilterChainNode<T, TYPE extends ClusterInvoker<T>, FILTER extends BaseFilter>
- extends FilterChainNode<T, TYPE, FILTER> implements ClusterInvoker<T> {
+ extends FilterChainNode<T, TYPE, FILTER> implements ClusterInvoker<T> {
public ClusterFilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, FILTER filter) {
super(originalInvoker, nextNode, filter);
}
@@ -165,4 +171,200 @@ public interface FilterChainBuilder {
return getOriginalInvoker().isDestroyed();
}
}
+
+ class CallbackRegistrationInvoker<T, FILTER extends BaseFilter> implements Invoker<T> {
+ static final Logger LOGGER = LoggerFactory.getLogger(CallbackRegistrationInvoker.class);
+ final Invoker<T> filterInvoker;
+ final List<FILTER> filters;
+
+ public CallbackRegistrationInvoker(Invoker<T> filterInvoker, List<FILTER> filters) {
+ this.filterInvoker = filterInvoker;
+ this.filters = filters;
+ }
+
+ @Override
+ public Result invoke(Invocation invocation) throws RpcException {
+ Result asyncResult = filterInvoker.invoke(invocation);
+ asyncResult.whenCompleteWithContext((r, t) -> {
+ for (int i = filters.size() - 1; i >= 0; i--) {
+ FILTER filter = filters.get(i);
+ try {
+ if (filter instanceof ListenableFilter) {
+ ListenableFilter listenableFilter = ((ListenableFilter) filter);
+ Filter.Listener listener = listenableFilter.listener(invocation);
+ try {
+ if (listener != null) {
+ if (t == null) {
+ listener.onResponse(r, filterInvoker, invocation);
+ } else {
+ listener.onError(t, filterInvoker, invocation);
+ }
+ }
+ } finally {
+ listenableFilter.removeListener(invocation);
+ }
+ } else if (filter instanceof FILTER.Listener) {
+ FILTER.Listener listener = (FILTER.Listener) filter;
+ if (t == null) {
+ listener.onResponse(r, filterInvoker, invocation);
+ } else {
+ listener.onError(t, filterInvoker, invocation);
+ }
+ }
+ } catch (Throwable filterThrowable) {
+ LOGGER.error(String.format("Exception occurred while executing the %s filter named %s.", i, filter.getClass().getSimpleName()));
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(String.format("Whole filter list is: %s", filters.stream().map(tmpFilter -> tmpFilter.getClass().getSimpleName()).collect(Collectors.toList())));
+ }
+ throw filterThrowable;
+ }
+ }
+ });
+
+ return asyncResult;
+ }
+
+ @Override
+ public Class<T> getInterface() {
+ return filterInvoker.getInterface();
+ }
+
+ @Override
+ public URL getUrl() {
+ return filterInvoker.getUrl();
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return filterInvoker.isAvailable();
+ }
+
+ @Override
+ public void destroy() {
+ filterInvoker.destroy();
+ }
+ }
+
+ class ClusterCallbackRegistrationInvoker<T, FILTER extends BaseFilter> extends CallbackRegistrationInvoker<T, FILTER>
+ implements ClusterInvoker<T> {
+ private ClusterInvoker<T> originalInvoker;
+
+ public ClusterCallbackRegistrationInvoker(ClusterInvoker<T> originalInvoker, Invoker<T> filterInvoker, List<FILTER> filters) {
+ super(filterInvoker, filters);
+ this.originalInvoker = originalInvoker;
+ }
+
+ public ClusterInvoker<T> getOriginalInvoker() {
+ return originalInvoker;
+ }
+
+ @Override
+ public URL getRegistryUrl() {
+ return getOriginalInvoker().getRegistryUrl();
+ }
+
+ @Override
+ public Directory<T> getDirectory() {
+ return getOriginalInvoker().getDirectory();
+ }
+
+ @Override
+ public boolean isDestroyed() {
+ return getOriginalInvoker().isDestroyed();
+ }
+ }
+
+
+ @Experimental("Works for the same purpose as FilterChainNode, replace FilterChainNode with this one when proved stable enough")
+ class CopyOfFilterChainNode<T, TYPE extends Invoker<T>, FILTER extends BaseFilter> implements Invoker<T> {
+ TYPE originalInvoker;
+ Invoker<T> nextNode;
+ FILTER filter;
+
+ public CopyOfFilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, FILTER filter) {
+ this.originalInvoker = originalInvoker;
+ this.nextNode = nextNode;
+ this.filter = filter;
+ }
+
+ public TYPE getOriginalInvoker() {
+ return originalInvoker;
+ }
+
+ @Override
+ public Class<T> getInterface() {
+ return originalInvoker.getInterface();
+ }
+
+ @Override
+ public URL getUrl() {
+ return originalInvoker.getUrl();
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return originalInvoker.isAvailable();
+ }
+
+ @Override
+ public Result invoke(Invocation invocation) throws RpcException {
+ Result asyncResult;
+ try {
+ asyncResult = filter.invoke(nextNode, invocation);
+ } catch (Exception e) {
+ if (filter instanceof ListenableFilter) {
+ ListenableFilter listenableFilter = ((ListenableFilter) filter);
+ try {
+ Filter.Listener listener = listenableFilter.listener(invocation);
+ if (listener != null) {
+ listener.onError(e, originalInvoker, invocation);
+ }
+ } finally {
+ listenableFilter.removeListener(invocation);
+ }
+ } else if (filter instanceof FILTER.Listener) {
+ FILTER.Listener listener = (FILTER.Listener) filter;
+ listener.onError(e, originalInvoker, invocation);
+ }
+ throw e;
+ } finally {
+
+ }
+ return asyncResult;
+ }
+
+ @Override
+ public void destroy() {
+ originalInvoker.destroy();
+ }
+
+ @Override
+ public String toString() {
+ return originalInvoker.toString();
+ }
+ }
+
+ @Experimental("Works for the same purpose as ClusterFilterChainNode, replace ClusterFilterChainNode with this one when proved stable enough")
+ class CopyOfClusterFilterChainNode<T, TYPE extends ClusterInvoker<T>, FILTER extends BaseFilter>
+ extends CopyOfFilterChainNode<T, TYPE, FILTER> implements ClusterInvoker<T> {
+ public CopyOfClusterFilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, FILTER filter) {
+ super(originalInvoker, nextNode, filter);
+ }
+
+
+ @Override
+ public URL getRegistryUrl() {
+ return getOriginalInvoker().getRegistryUrl();
+ }
+
+ @Override
+ public Directory<T> getDirectory() {
+ return getOriginalInvoker().getDirectory();
+ }
+
+ @Override
+ public boolean isDestroyed() {
+ return getOriginalInvoker().isDestroyed();
+ }
+ }
}
diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilderTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilderTest.java
index 8a579b8..7a5779a 100644
--- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilderTest.java
+++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilderTest.java
@@ -19,16 +19,17 @@ package org.apache.dubbo.rpc.cluster.filter;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.Invocation;
-import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.protocol.AbstractInvoker;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.REFERENCE_FILTER_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;
public class DefaultFilterChainBuilderTest {
@@ -62,8 +63,8 @@ public class DefaultFilterChainBuilderTest {
}
};
invokerAfterBuild = defaultFilterChainBuilder.buildInvokerChain(invokerWithFilter, REFERENCE_FILTER_KEY, CONSUMER);
- Assertions.assertTrue(invokerAfterBuild instanceof FilterChainBuilder.FilterChainNode);
- Assertions.assertTrue(((FilterChainBuilder.FilterChainNode<?, ?, ?>) invokerAfterBuild).filter instanceof LogFilter);
+ Assertions.assertTrue(invokerAfterBuild instanceof FilterChainBuilder.CallbackRegistrationInvoker);
+ Assertions.assertEquals(1, ((FilterChainBuilder.CallbackRegistrationInvoker<?, ?>) invokerAfterBuild).filters.size());
}
@@ -97,8 +98,7 @@ public class DefaultFilterChainBuilderTest {
}
};
invokerAfterBuild = defaultFilterChainBuilder.buildInvokerChain(invokerWithFilter, REFERENCE_FILTER_KEY, CONSUMER);
- Assertions.assertTrue(invokerAfterBuild instanceof FilterChainBuilder.FilterChainNode);
- Assertions.assertTrue(((FilterChainBuilder.FilterChainNode<?, ?, ?>) invokerAfterBuild).filter instanceof LogFilter);
+ Assertions.assertTrue(invokerAfterBuild instanceof FilterChainBuilder.CallbackRegistrationInvoker);
}
}
diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java
index 4b9fba5..1fd4075 100644
--- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java
+++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java
@@ -295,7 +295,7 @@ public class FailoverClusterInvokerTest {
}
invokers.clear();
MockInvoker<Demo> invoker3 = new MockInvoker<>(Demo.class, url);
- invoker3.setResult(AsyncRpcResult.newDefaultAsyncResult(null));
+ invoker3.setResult(AsyncRpcResult.newDefaultAsyncResult(mock(RpcInvocation.class)));
invokers.add(invoker3);
dic.notify(invokers);
return null;
diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/MergeableClusterInvokerTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/MergeableClusterInvokerTest.java
index bb2ff1b..b95d0ef 100644
--- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/MergeableClusterInvokerTest.java
+++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/MergeableClusterInvokerTest.java
@@ -23,6 +23,7 @@ import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.ModuleModel;
@@ -41,8 +42,6 @@ import java.util.Map;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.rpc.Constants.MERGER_KEY;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -55,7 +54,7 @@ public class MergeableClusterInvokerTest {
private Directory directory = mock(Directory.class);
private Invoker firstInvoker = mock(Invoker.class);
private Invoker secondInvoker = mock(Invoker.class);
- private Invocation invocation = mock(Invocation.class);
+ private Invocation invocation = mock(RpcInvocation.class);
private ModuleModel moduleModel = mock(ModuleModel.class);
private MergeableClusterInvoker<MenuService> mergeableClusterInvoker;
@@ -98,7 +97,7 @@ public class MergeableClusterInvokerTest {
directory = mock(Directory.class);
firstInvoker = mock(Invoker.class);
secondInvoker = mock(Invoker.class);
- invocation = mock(Invocation.class);
+ invocation = mock(RpcInvocation.class);
}
diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractClusterTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractClusterTest.java
index 41eff47..6245b71 100644
--- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractClusterTest.java
+++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractClusterTest.java
@@ -28,8 +28,8 @@ import org.apache.dubbo.rpc.cluster.LoadBalance;
import org.apache.dubbo.rpc.cluster.filter.DemoService;
import org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder;
import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker;
-
import org.apache.dubbo.rpc.model.ApplicationModel;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -69,7 +69,7 @@ public class AbstractClusterTest {
Invoker<?> invoker = demoCluster.join(directory, true);
Assertions.assertTrue(invoker instanceof AbstractCluster.ClusterFilterInvoker);
Assertions.assertTrue(((AbstractCluster.ClusterFilterInvoker<?>) invoker).getFilterInvoker()
- instanceof FilterChainBuilder.ClusterFilterChainNode);
+ instanceof FilterChainBuilder.ClusterCallbackRegistrationInvoker);
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/Experimental.java b/dubbo-common/src/main/java/org/apache/dubbo/common/Experimental.java
index 64f4cd3..746bc1b 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/Experimental.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/Experimental.java
@@ -22,7 +22,7 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
- * Indicating unstable API, may get removed or changed in the next release.
+ * Indicating unstable API, may get removed or changed in future releases.
*/
@Retention(RetentionPolicy.CLASS)
@Target({
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
index f564ea2..7b9de27 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
@@ -503,4 +503,9 @@ public interface CommonConstants {
String ENABLE_CONNECTIVITY_VALIDATION = "dubbo.connectivity.validation";
String DUBBO_INTERNAL_APPLICATION = "DUBBO_INTERNAL_APPLICATION";
+
+ String WORKING_CLASSLOADER_KEY = "WORKING_CLASSLOADER";
+
+ String STAGED_CLASSLOADER_KEY = "STAGED_CLASSLOADER";
+
}
diff --git a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/integration/AbstractRegistryCenterExporterListener.java b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/integration/AbstractRegistryCenterExporterListener.java
index 69a3d5e..74e1e69 100644
--- a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/integration/AbstractRegistryCenterExporterListener.java
+++ b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/integration/AbstractRegistryCenterExporterListener.java
@@ -26,9 +26,9 @@ import org.apache.dubbo.rpc.listener.ListenerExporterWrapper;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.HashSet;
/**
* The abstraction of {@link ExporterListener} is to record exported exporters, which should be extended by different sub-classes.
@@ -56,12 +56,13 @@ public abstract class AbstractRegistryCenterExporterListener implements Exporter
@Override
public void exported(Exporter<?> exporter) throws RpcException {
ListenerExporterWrapper listenerExporterWrapper = (ListenerExporterWrapper) exporter;
- FilterChainBuilder.FilterChainNode filterChainNode = (FilterChainBuilder.FilterChainNode) listenerExporterWrapper.getInvoker();
- if (filterChainNode == null ||
- filterChainNode.getInterface() != getInterface()) {
+ FilterChainBuilder.CallbackRegistrationInvoker callbackRegistrationInvoker = (FilterChainBuilder.CallbackRegistrationInvoker) listenerExporterWrapper.getInvoker();
+ if (callbackRegistrationInvoker == null ||
+ callbackRegistrationInvoker.getInterface() != getInterface()) {
return;
}
exportedExporters.add(exporter);
+ FilterChainBuilder.CopyOfFilterChainNode filterChainNode = getFilterChainNode(callbackRegistrationInvoker);
do {
Filter filter = this.getFilter(filterChainNode);
if (filter != null) {
@@ -96,7 +97,24 @@ public abstract class AbstractRegistryCenterExporterListener implements Exporter
/**
* Use reflection to obtain {@link Filter}
*/
- private Filter getFilter(FilterChainBuilder.FilterChainNode filterChainNode) {
+ private FilterChainBuilder.CopyOfFilterChainNode getFilterChainNode(FilterChainBuilder.CallbackRegistrationInvoker callbackRegistrationInvoker) {
+ if (callbackRegistrationInvoker != null) {
+ Field field = null;
+ try {
+ field = callbackRegistrationInvoker.getClass().getDeclaredField("filterInvoker");
+ field.setAccessible(true);
+ return (FilterChainBuilder.CopyOfFilterChainNode) field.get(callbackRegistrationInvoker);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ // ignore
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Use reflection to obtain {@link Filter}
+ */
+ private Filter getFilter(FilterChainBuilder.CopyOfFilterChainNode filterChainNode) {
if (filterChainNode != null) {
Field field = null;
try {
@@ -111,17 +129,17 @@ public abstract class AbstractRegistryCenterExporterListener implements Exporter
}
/**
- * Use reflection to obtain {@link FilterChainBuilder.FilterChainNode}
+ * Use reflection to obtain {@link FilterChainBuilder.CopyOfFilterChainNode}
*/
- private FilterChainBuilder.FilterChainNode getNextNode(FilterChainBuilder.FilterChainNode filterChainNode) {
+ private FilterChainBuilder.CopyOfFilterChainNode getNextNode(FilterChainBuilder.CopyOfFilterChainNode filterChainNode) {
if (filterChainNode != null) {
Field field = null;
try {
field = filterChainNode.getClass().getDeclaredField("nextNode");
field.setAccessible(true);
Object object = field.get(filterChainNode);
- if (object instanceof FilterChainBuilder.FilterChainNode) {
- return (FilterChainBuilder.FilterChainNode) object;
+ if (object instanceof FilterChainBuilder.CopyOfFilterChainNode) {
+ return (FilterChainBuilder.CopyOfFilterChainNode) object;
}
} catch (NoSuchFieldException | IllegalAccessException e) {
// ignore
diff --git a/dubbo-plugin/dubbo-auth/src/test/java/org/apache/dubbo/auth/filter/ProviderAuthFilterTest.java b/dubbo-plugin/dubbo-auth/src/test/java/org/apache/dubbo/auth/filter/ProviderAuthFilterTest.java
index aac6287..6e12f6f 100644
--- a/dubbo-plugin/dubbo-auth/src/test/java/org/apache/dubbo/auth/filter/ProviderAuthFilterTest.java
+++ b/dubbo-plugin/dubbo-auth/src/test/java/org/apache/dubbo/auth/filter/ProviderAuthFilterTest.java
@@ -24,7 +24,9 @@ import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.model.ApplicationModel;
+
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -43,7 +45,7 @@ class ProviderAuthFilterTest {
void testAuthDisabled() {
URL url = mock(URL.class);
Invoker invoker = mock(Invoker.class);
- Invocation invocation = mock(Invocation.class);
+ Invocation invocation = mock(RpcInvocation.class);
when(invoker.getUrl()).thenReturn(url);
ProviderAuthFilter providerAuthFilter = new ProviderAuthFilter(ApplicationModel.defaultModel());
providerAuthFilter.invoke(invoker, invocation);
@@ -58,7 +60,7 @@ class ProviderAuthFilterTest {
.addParameter(CommonConstants.APPLICATION_KEY, "test")
.addParameter(Constants.SERVICE_AUTH, true);
Invoker invoker = mock(Invoker.class);
- Invocation invocation = mock(Invocation.class);
+ Invocation invocation = mock(RpcInvocation.class);
when(invoker.getUrl()).thenReturn(url);
ProviderAuthFilter providerAuthFilter = new ProviderAuthFilter(ApplicationModel.defaultModel());
providerAuthFilter.invoke(invoker, invocation);
@@ -74,7 +76,7 @@ class ProviderAuthFilterTest {
.addParameter(CommonConstants.APPLICATION_KEY, "test")
.addParameter(Constants.SERVICE_AUTH, true);
Invoker invoker = mock(Invoker.class);
- Invocation invocation = mock(Invocation.class);
+ Invocation invocation = mock(RpcInvocation.class);
when(invocation.getAttachment(Constants.REQUEST_SIGNATURE_KEY)).thenReturn(null);
when(invoker.getUrl()).thenReturn(url);
@@ -92,7 +94,7 @@ class ProviderAuthFilterTest {
.addParameter(CommonConstants.APPLICATION_KEY, "test")
.addParameter(Constants.SERVICE_AUTH, true);
Invoker invoker = mock(Invoker.class);
- Invocation invocation = mock(Invocation.class);
+ Invocation invocation = mock(RpcInvocation.class);
when(invocation.getAttachment(Constants.REQUEST_SIGNATURE_KEY)).thenReturn(null);
when(invoker.getUrl()).thenReturn(url);
@@ -107,7 +109,7 @@ class ProviderAuthFilterTest {
.addParameter(CommonConstants.APPLICATION_KEY, "test-provider")
.addParameter(Constants.SERVICE_AUTH, true);
Invoker invoker = mock(Invoker.class);
- Invocation invocation = mock(Invocation.class);
+ Invocation invocation = mock(RpcInvocation.class);
when(invocation.getObjectAttachment(Constants.REQUEST_SIGNATURE_KEY)).thenReturn("dubbo");
when(invocation.getObjectAttachment(Constants.AK_KEY)).thenReturn("ak");
when(invocation.getObjectAttachment(CommonConstants.CONSUMER)).thenReturn("test-consumer");
@@ -135,7 +137,7 @@ class ProviderAuthFilterTest {
.addParameter(Constants.SERVICE_AUTH, true);
Invoker invoker = mock(Invoker.class);
- Invocation invocation = mock(Invocation.class);
+ Invocation invocation = mock(RpcInvocation.class);
when(invocation.getObjectAttachment(Constants.AK_KEY)).thenReturn("ak");
when(invocation.getObjectAttachment(CommonConstants.CONSUMER)).thenReturn("test-consumer");
when(invocation.getObjectAttachment(Constants.REQUEST_TIMESTAMP_KEY)).thenReturn(currentTimeMillis);
@@ -168,7 +170,7 @@ class ProviderAuthFilterTest {
.addParameter(CommonConstants.APPLICATION_KEY, "test-provider")
.addParameter(Constants.SERVICE_AUTH, true);
Invoker invoker = mock(Invoker.class);
- Invocation invocation = mock(Invocation.class);
+ Invocation invocation = mock(RpcInvocation.class);
when(invocation.getAttachment(Constants.AK_KEY)).thenReturn("ak");
when(invocation.getAttachment(CommonConstants.CONSUMER)).thenReturn("test-consumer");
when(invocation.getAttachment(Constants.REQUEST_TIMESTAMP_KEY)).thenReturn(String.valueOf(currentTimeMillis));
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContext.java
index 1f51a54..0635d29 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContext.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContext.java
@@ -74,4 +74,50 @@ public interface AsyncContext {
* </code>
*/
void signalContextSwitch();
+
+ /**
+ * Reset Context is not necessary. Only reset context after result was write back if it is necessary.
+ *
+ * <code>
+ * public class AsyncServiceImpl implements AsyncService {
+ * public String sayHello(String name) {
+ * final AsyncContext asyncContext = RpcContext.startAsync();
+ * new Thread(() -> {
+ * <p>
+ * // the right place to use this method
+ * asyncContext.signalContextSwitch();
+ * <p>
+ * try {
+ * Thread.sleep(500);
+ * } catch (InterruptedException e) {
+ * e.printStackTrace();
+ * }
+ * asyncContext.write("Hello " + name + ", response from provider.");
+ * // only reset after asyncContext.write()
+ * asyncContext.resetContext();
+ * }).start();
+ * return null;
+ * }
+ * }
+ * </code>
+ *
+ * <code>
+ * public class AsyncServiceImpl implements AsyncService {
+ * public CompletableFuture sayHello(String name) {
+ * CompletableFuture future = new CompletableFuture();
+ * final AsyncContext asyncContext = RpcContext.startAsync();
+ * new Thread(() -> {
+ * // the right place to use this method
+ * asyncContext.signalContextSwitch();
+ * // some operations...
+ * future.complete();
+ * // only reset after future.complete()
+ * asyncContext.resetContext();
+ * }).start();
+ * return future;
+ * }
+ * }
+ * </code>
+ */
+ void resetContext();
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java
index 1331b21..9f9a083 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java
@@ -26,12 +26,13 @@ public class AsyncContextImpl implements AsyncContext {
private CompletableFuture<Object> future;
- private RpcContextAttachment storedContext;
- private RpcContextAttachment storedServerContext;
+ private final RpcContext.RestoreContext restoreContext;
+ private final ClassLoader restoreClassLoader;
+ private ClassLoader stagedClassLoader;
public AsyncContextImpl() {
- this.storedContext = RpcContext.getClientAttachment();
- this.storedServerContext = RpcContext.getServerContext();
+ restoreContext = RpcContext.storeContext(false);
+ restoreClassLoader = Thread.currentThread().getContextClassLoader();
}
@Override
@@ -67,9 +68,19 @@ public class AsyncContextImpl implements AsyncContext {
@Override
public void signalContextSwitch() {
- RpcContext.restoreContext(storedContext);
- RpcContext.restoreServerContext(storedServerContext);
- // Restore any other contexts in here if necessary.
+ RpcContext.restoreContext(restoreContext);
+ if (restoreClassLoader != null) {
+ stagedClassLoader = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(restoreClassLoader);
+ }
+ }
+
+ @Override
+ public void resetContext() {
+ RpcContext.removeContext();
+ if (stagedClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(restoreClassLoader);
+ }
}
public CompletableFuture<Object> getInternalFuture() {
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
index 1accccd..80a46da 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
import org.apache.dubbo.rpc.model.ConsumerMethodModel;
+import org.apache.dubbo.rpc.protocol.dubbo.FutureAdapter;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -51,10 +52,10 @@ public class AsyncRpcResult implements Result {
/**
* RpcContext may already have been changed when callback happens, it happens when the same thread is used to execute another RPC call.
- * So we should keep the reference of current RpcContext instance and restore it before callback being executed.
+ * So we should keep the copy of current RpcContext instance and restore it before callback being executed.
*/
- private RpcContextAttachment storedContext;
- private RpcContextAttachment storedServerContext;
+ private RpcContext.RestoreContext storedContext;
+
private Executor executor;
private Invocation invocation;
@@ -64,8 +65,12 @@ public class AsyncRpcResult implements Result {
public AsyncRpcResult(CompletableFuture<AppResponse> future, Invocation invocation) {
this.responseFuture = future;
this.invocation = invocation;
- this.storedContext = RpcContext.getClientAttachment();
- this.storedServerContext = RpcContext.getServerContext();
+ RpcInvocation rpcInvocation = (RpcInvocation) invocation;
+ if (InvokeMode.SYNC != rpcInvocation.getInvokeMode() && !future.isDone()) {
+ this.storedContext = RpcContext.storeContext(false);
+ } else {
+ this.storedContext = RpcContext.storeContext(true);
+ }
}
/**
@@ -193,10 +198,17 @@ public class AsyncRpcResult implements Result {
public Result whenCompleteWithContext(BiConsumer<Result, Throwable> fn) {
this.responseFuture = this.responseFuture.whenComplete((v, t) -> {
- beforeContext.accept(v, t);
+ RpcContext.RestoreContext tmpContext = RpcContext.storeContext(false);
+ RpcContext.restoreContext(storedContext);
+
fn.accept(v, t);
- afterContext.accept(v, t);
+
+ RpcContext.restoreContext(tmpContext);
});
+
+ // Necessary! update future in context, see https://github.com/apache/dubbo/issues/9461
+ RpcContext.getServiceContext().setFuture(new FutureAdapter<>(this.responseFuture));
+
return this;
}
@@ -281,24 +293,6 @@ public class AsyncRpcResult implements Result {
}
/**
- * tmp context to use when the thread switch to Dubbo thread.
- */
- private RpcContextAttachment tmpContext;
-
- private RpcContextAttachment tmpServerContext;
- private BiConsumer<Result, Throwable> beforeContext = (appResponse, t) -> {
- tmpContext = RpcContext.getClientAttachment();
- tmpServerContext = RpcContext.getServerContext();
- RpcContext.restoreContext(storedContext);
- RpcContext.restoreServerContext(storedServerContext);
- };
-
- private BiConsumer<Result, Throwable> afterContext = (appResponse, t) -> {
- RpcContext.restoreContext(tmpContext);
- RpcContext.restoreServerContext(tmpServerContext);
- };
-
- /**
* Some utility methods used to quickly generate default AsyncRpcResult instance.
*/
public static AsyncRpcResult newDefaultAsyncResult(AppResponse appResponse, Invocation invocation) {
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/BaseFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/BaseFilter.java
index 594954f..02c643a 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/BaseFilter.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/BaseFilter.java
@@ -22,10 +22,33 @@ public interface BaseFilter {
*/
Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;
+ /**
+ * This callback listener applies to both synchronous and asynchronous calls, please put logics that need to be executed
+ * on return of rpc result in onResponse or onError respectively based on it is normal return or exception return.
+ * <p>
+ * There's something that needs to pay attention on legacy synchronous style filer refactor, the thing is, try to move logics
+ * previously defined in the 'finally block' to both onResponse and onError.
+ */
interface Listener {
+ /**
+ * This method will only be called on successful remote rpc execution, that means, the service in on remote received
+ * the request and the result (normal or exceptional) returned successfully.
+ *
+ * @param appResponse, the rpc call result, it can represent both normal result and exceptional result
+ * @param invoker, context
+ * @param invocation, context
+ */
void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation);
+ /**
+ * This method will be called on detection of framework exceptions, for example, TimeoutException, NetworkException
+ * Exception raised in Filters, etc.
+ *
+ * @param t, framework exception
+ * @param invoker, context
+ * @param invocation, context
+ */
void onError(Throwable t, Invoker<?> invoker, Invocation invocation);
}
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/ListenableFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/ListenableFilter.java
index 160ddcf..b590bfb 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/ListenableFilter.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/ListenableFilter.java
@@ -23,9 +23,10 @@ import java.util.concurrent.ConcurrentMap;
* It's recommended to implement Filter.Listener directly for callback registration, check the default implementation,
* see {@link org.apache.dubbo.rpc.filter.ExceptionFilter}, for example.
* <p>
- * If you do not want to share Listener instance between RPC calls. You can use ListenableFilter
+ * If you do not want to share Listener instance between RPC calls. ListenableFilter can be used
* to keep a 'one Listener each RPC call' model.
*/
+@Deprecated
public abstract class ListenableFilter implements Filter {
protected Listener listener = null;
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
index 13f108b..1db82f1 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
@@ -122,10 +122,6 @@ public class RpcContext {
return SERVER_LOCAL.get();
}
- public static void restoreServerContext(RpcContextAttachment oldServerContext) {
- SERVER_LOCAL.set(oldServerContext);
- }
-
/**
* remove server side context.
*
@@ -198,10 +194,6 @@ public class RpcContext {
this.remove = remove;
}
- public static void restoreContext(RpcContextAttachment oldContext) {
- CLIENT_ATTACHMENT.set(oldContext);
- }
-
/**
* remove context.
*
@@ -810,4 +802,70 @@ public class RpcContext {
public static void setRpcContext(URL url) {
RpcServiceContext.setRpcContext(url);
}
+
+ protected static RestoreContext storeContext(boolean needCopy) {
+ return new RestoreContext(needCopy);
+ }
+
+ protected static void restoreContext(RestoreContext restoreContext) {
+ if (restoreContext != null) {
+ restoreContext.restore();
+ }
+ }
+
+ protected static void restoreClientAttachment(RpcContextAttachment oldContext) {
+ CLIENT_ATTACHMENT.set(oldContext);
+ }
+
+ protected static void restoreServerContext(RpcContextAttachment oldServerContext) {
+ SERVER_LOCAL.set(oldServerContext);
+ }
+
+ protected static void restoreServerAttachment(RpcContextAttachment oldServerContext) {
+ SERVER_ATTACHMENT.set(oldServerContext);
+ }
+
+ protected static void restoreServiceContext(RpcServiceContext oldServiceContext) {
+ SERVICE_CONTEXT.set(oldServiceContext);
+ }
+
+ /**
+ * Used to temporarily store and restore all kinds of contexts of current thread.
+ */
+ public static class RestoreContext {
+ private final RpcServiceContext serviceContext;
+ private final RpcContextAttachment clientAttachment;
+ private final RpcContextAttachment serverAttachment;
+ private final RpcContextAttachment serverLocal;
+
+ public RestoreContext(boolean needCopy) {
+ serviceContext = getServiceContext().copyOf(needCopy);
+ clientAttachment = getClientAttachment().copyOf(needCopy);
+ serverAttachment = getServerAttachment().copyOf(needCopy);
+ serverLocal = getServerContext().copyOf(needCopy);
+ }
+
+ public void restore() {
+ if (serviceContext != null) {
+ restoreServiceContext(serviceContext);
+ } else {
+ removeServiceContext();
+ }
+ if (clientAttachment != null) {
+ restoreClientAttachment(clientAttachment);
+ } else {
+ removeClientAttachment();
+ }
+ if (serverAttachment != null) {
+ restoreServerAttachment(serverAttachment);
+ } else {
+ removeServerAttachment();
+ }
+ if (serverLocal != null) {
+ restoreServerContext(serverLocal);
+ } else {
+ removeServerContext();
+ }
+ }
+ }
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContextAttachment.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContextAttachment.java
index ddbd981..aec6943 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContextAttachment.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContextAttachment.java
@@ -201,4 +201,28 @@ public class RpcContextAttachment extends RpcContext{
return getAttachment(key);
}
+ /**
+ * Also see {@link RpcServiceContext#copyOf(boolean)}
+ *
+ * @return a copy of RpcContextAttachment with deep copied attachments
+ */
+ public RpcContextAttachment copyOf(boolean needCopy) {
+ if (!isValid()) {
+ return null;
+ }
+
+ if (needCopy) {
+ RpcContextAttachment copy = new RpcContextAttachment();
+ if (CollectionUtils.isNotEmptyMap(attachments)) {
+ copy.attachments.putAll(this.attachments);
+ }
+ return copy;
+ } else {
+ return this;
+ }
+ }
+
+ private boolean isValid() {
+ return CollectionUtils.isNotEmptyMap(attachments);
+ }
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcServiceContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcServiceContext.java
index ebec31f..e61632d 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcServiceContext.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcServiceContext.java
@@ -41,6 +41,9 @@ public class RpcServiceContext extends RpcContext {
protected RpcServiceContext() {
}
+ // RPC service context updated before each service call.
+ private URL consumerUrl;
+
private List<URL> urls;
private URL url;
@@ -584,9 +587,6 @@ public class RpcServiceContext extends RpcContext {
return asyncContext;
}
- // RPC service context updated before each service call.
- private URL consumerUrl;
-
@Override
public String getGroup() {
if (consumerUrl == null) {
@@ -649,4 +649,39 @@ public class RpcServiceContext extends RpcContext {
RpcServiceContext rpcContext = RpcContext.getServiceContext();
rpcContext.setConsumerUrl(url);
}
+
+ /**
+ * Only part of the properties are copied, the others are either not used currently or can be got from invocation.
+ * Also see {@link RpcContextAttachment#copyOf(boolean)}
+ *
+ * @param needCopy
+ * @return a shallow copy of RpcServiceContext
+ */
+ public RpcServiceContext copyOf(boolean needCopy) {
+ if (!isValid()) {
+ return this;
+ }
+
+ if (needCopy) {
+ RpcServiceContext copy = new RpcServiceContext();
+ copy.consumerUrl = this.consumerUrl;
+ copy.localAddress = this.localAddress;
+ copy.remoteAddress = this.remoteAddress;
+ copy.invocation = this.invocation;
+ copy.asyncContext = this.asyncContext;
+ return copy;
+ } else {
+ return this;
+ }
+ }
+
+
+ private boolean isValid() {
+ return this.consumerUrl != null
+ || this.localAddress != null
+ || this.remoteAddress != null
+ || this.invocation != null
+ || this.asyncContext != null;
+ }
+
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ClassLoaderFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ClassLoaderCallbackFilter.java
similarity index 56%
copy from dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ClassLoaderFilter.java
copy to dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ClassLoaderCallbackFilter.java
index c71e2d1..a9b1269 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ClassLoaderFilter.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ClassLoaderCallbackFilter.java
@@ -18,27 +18,40 @@ package org.apache.dubbo.rpc.filter;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.rpc.BaseFilter;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
+import static org.apache.dubbo.common.constants.CommonConstants.WORKING_CLASSLOADER_KEY;
+
/**
- * Set the current execution thread class loader to service interface's class loader.
+ * Switch thread context class loader on filter callback.
*/
-@Activate(group = CommonConstants.PROVIDER, order = -30000)
-public class ClassLoaderFilter implements Filter {
+@Activate(group = CommonConstants.PROVIDER, order = Integer.MAX_VALUE)
+public class ClassLoaderCallbackFilter implements Filter, BaseFilter.Listener {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
- ClassLoader ocl = Thread.currentThread().getContextClassLoader();
- Thread.currentThread().setContextClassLoader(invoker.getInterface().getClassLoader());
- try {
- return invoker.invoke(invocation);
- } finally {
- Thread.currentThread().setContextClassLoader(ocl);
- }
+ return invoker.invoke(invocation);
+ }
+
+ @Override
+ public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
+ setClassLoader(invoker, invocation);
}
+ @Override
+ public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
+ setClassLoader(invoker, invocation);
+ }
+
+ private void setClassLoader(Invoker<?> invoker, Invocation invocation) {
+ ClassLoader workingClassLoader = (ClassLoader) invocation.get(WORKING_CLASSLOADER_KEY);
+ if (workingClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(workingClassLoader);
+ }
+ }
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ClassLoaderFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ClassLoaderFilter.java
index c71e2d1..6a14fe1 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ClassLoaderFilter.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ClassLoaderFilter.java
@@ -18,27 +18,51 @@ package org.apache.dubbo.rpc.filter;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.rpc.BaseFilter;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
+import static org.apache.dubbo.common.constants.CommonConstants.STAGED_CLASSLOADER_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.WORKING_CLASSLOADER_KEY;
+
/**
* Set the current execution thread class loader to service interface's class loader.
*/
@Activate(group = CommonConstants.PROVIDER, order = -30000)
-public class ClassLoaderFilter implements Filter {
+public class ClassLoaderFilter implements Filter, BaseFilter.Listener {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
- ClassLoader ocl = Thread.currentThread().getContextClassLoader();
- Thread.currentThread().setContextClassLoader(invoker.getInterface().getClassLoader());
+ ClassLoader stagedClassLoader = Thread.currentThread().getContextClassLoader();
+ ClassLoader effectiveClassLoader = invoker.getInterface().getClassLoader();
+ invocation.put(STAGED_CLASSLOADER_KEY, stagedClassLoader);
+ invocation.put(WORKING_CLASSLOADER_KEY, effectiveClassLoader);
+
+ Thread.currentThread().setContextClassLoader(effectiveClassLoader);
try {
return invoker.invoke(invocation);
} finally {
- Thread.currentThread().setContextClassLoader(ocl);
+ Thread.currentThread().setContextClassLoader(stagedClassLoader);
}
}
+ @Override
+ public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
+ resetClassLoader(invoker, invocation);
+ }
+
+ @Override
+ public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
+ resetClassLoader(invoker, invocation);
+ }
+
+ private void resetClassLoader(Invoker<?> invoker, Invocation invocation) {
+ ClassLoader stagedClassLoader = (ClassLoader) invocation.get(STAGED_CLASSLOADER_KEY);
+ if (stagedClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(stagedClassLoader);
+ }
+ }
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java
index 08a1286..3e38c67 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java
@@ -133,7 +133,7 @@ public class ContextFilter implements Filter, Filter.Listener {
context.clearAfterEachInvoke(true);
RpcContext.removeServerAttachment();
RpcContext.removeServiceContext();
- // IMPORTANT! For async scenario, we must remove context from current thread, so we always create a new RpcContext for the next invoke for the same thread.
+ // IMPORTANT! For async scenario, context must be removed from current thread, so a new RpcContext is always created for the next invoke for the same thread.
RpcContext.getClientAttachment().removeAttachment(TIME_COUNTDOWN_KEY);
RpcContext.removeServerContext();
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
index 2230f50..336671a 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
@@ -108,10 +108,10 @@ public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
}
private CompletableFuture<Object> wrapWithFuture(Object value) {
- if (RpcContext.getServiceContext().isAsyncStarted()) {
- return ((AsyncContextImpl)(RpcContext.getServiceContext().getAsyncContext())).getInternalFuture();
- } else if (value instanceof CompletableFuture) {
+ if (value instanceof CompletableFuture) {
return (CompletableFuture<Object>) value;
+ } else if (RpcContext.getServiceContext().isAsyncStarted()) {
+ return ((AsyncContextImpl) (RpcContext.getServiceContext().getAsyncContext())).getInternalFuture();
}
return CompletableFuture.completedFuture(value);
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter b/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter
index 7c526c2..34d56b4 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter
@@ -4,10 +4,11 @@ genericimpl=org.apache.dubbo.rpc.filter.GenericImplFilter
token=org.apache.dubbo.rpc.filter.TokenFilter
accesslog=org.apache.dubbo.rpc.filter.AccessLogFilter
classloader=org.apache.dubbo.rpc.filter.ClassLoaderFilter
+classloader-callback=org.apache.dubbo.rpc.filter.ClassLoaderCallbackFilter
context=org.apache.dubbo.rpc.filter.ContextFilter
exception=org.apache.dubbo.rpc.filter.ExceptionFilter
executelimit=org.apache.dubbo.rpc.filter.ExecuteLimitFilter
deprecated=org.apache.dubbo.rpc.filter.DeprecatedFilter
compatible=org.apache.dubbo.rpc.filter.CompatibleFilter
timeout=org.apache.dubbo.rpc.filter.TimeoutFilter
-tps=org.apache.dubbo.rpc.filter.TpsLimitFilter
\ No newline at end of file
+tps=org.apache.dubbo.rpc.filter.TpsLimitFilter
diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/RpcContextTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/RpcContextTest.java
index 4f1e6e9..a7dbd8f 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/RpcContextTest.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/RpcContextTest.java
@@ -202,4 +202,9 @@ public class RpcContextTest {
rpcContext.setObjectAttachments(map);
Assertions.assertEquals(map, rpcContext.getObjectAttachments());
}
+
+ @Test
+ public void testRestore() {
+
+ }
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/CompatibleFilterFilterTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/CompatibleFilterFilterTest.java
index 7750195..9a5152b 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/CompatibleFilterFilterTest.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/CompatibleFilterFilterTest.java
@@ -22,6 +22,7 @@ import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.support.DemoService;
import org.apache.dubbo.rpc.support.Type;
@@ -49,7 +50,7 @@ public class CompatibleFilterFilterTest {
@Test
public void testInvokerGeneric() {
- invocation = mock(Invocation.class);
+ invocation = mock(RpcInvocation.class);
given(invocation.getMethodName()).willReturn("$enumlength");
given(invocation.getParameterTypes()).willReturn(new Class<?>[]{Enum.class});
given(invocation.getArguments()).willReturn(new Object[]{"hello"});
@@ -69,7 +70,7 @@ public class CompatibleFilterFilterTest {
@Test
public void testResultHasException() {
- invocation = mock(Invocation.class);
+ invocation = mock(RpcInvocation.class);
given(invocation.getMethodName()).willReturn("enumlength");
given(invocation.getParameterTypes()).willReturn(new Class<?>[]{Enum.class});
given(invocation.getArguments()).willReturn(new Object[]{"hello"});
@@ -90,7 +91,7 @@ public class CompatibleFilterFilterTest {
@Test
public void testInvokerJsonPojoSerialization() throws Exception {
- invocation = mock(Invocation.class);
+ invocation = mock(RpcInvocation.class);
given(invocation.getMethodName()).willReturn("enumlength");
given(invocation.getParameterTypes()).willReturn(new Class<?>[]{Type[].class});
given(invocation.getArguments()).willReturn(new Object[]{"hello"});
@@ -100,7 +101,8 @@ public class CompatibleFilterFilterTest {
given(invoker.getInterface()).willReturn(DemoService.class);
AppResponse result = new AppResponse();
result.setValue("High");
- given(invoker.invoke(invocation)).willReturn(AsyncRpcResult.newDefaultAsyncResult(result, invocation));
+ AsyncRpcResult defaultAsyncResult = AsyncRpcResult.newDefaultAsyncResult(result, invocation);
+ given(invoker.invoke(invocation)).willReturn(defaultAsyncResult);
URL url = URL.valueOf("test://test:11/test?group=dubbo&version=1.1&serialization=json");
given(invoker.getUrl()).willReturn(url);
@@ -112,7 +114,7 @@ public class CompatibleFilterFilterTest {
@Test
public void testInvokerNonJsonEnumSerialization() throws Exception {
- invocation = mock(Invocation.class);
+ invocation = mock(RpcInvocation.class);
given(invocation.getMethodName()).willReturn("enumlength");
given(invocation.getParameterTypes()).willReturn(new Class<?>[]{Type[].class});
given(invocation.getArguments()).willReturn(new Object[]{"hello"});
@@ -122,7 +124,8 @@ public class CompatibleFilterFilterTest {
given(invoker.getInterface()).willReturn(DemoService.class);
AppResponse result = new AppResponse();
result.setValue("High");
- given(invoker.invoke(invocation)).willReturn(AsyncRpcResult.newDefaultAsyncResult(result, invocation));
+ AsyncRpcResult defaultAsyncResult = AsyncRpcResult.newDefaultAsyncResult(result, invocation);
+ given(invoker.invoke(invocation)).willReturn(defaultAsyncResult);
URL url = URL.valueOf("test://test:11/test?group=dubbo&version=1.1");
given(invoker.getUrl()).willReturn(url);
@@ -134,7 +137,7 @@ public class CompatibleFilterFilterTest {
@Test
public void testInvokerNonJsonNonPojoSerialization() {
- invocation = mock(Invocation.class);
+ invocation = mock(RpcInvocation.class);
given(invocation.getMethodName()).willReturn("echo");
given(invocation.getParameterTypes()).willReturn(new Class<?>[]{String.class});
given(invocation.getArguments()).willReturn(new Object[]{"hello"});
@@ -154,7 +157,7 @@ public class CompatibleFilterFilterTest {
@Test
public void testInvokerNonJsonPojoSerialization() {
- invocation = mock(Invocation.class);
+ invocation = mock(RpcInvocation.class);
given(invocation.getMethodName()).willReturn("echo");
given(invocation.getParameterTypes()).willReturn(new Class<?>[]{String.class});
given(invocation.getArguments()).willReturn(new Object[]{"hello"});
diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/EchoFilterTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/EchoFilterTest.java
index 76ee024..b31d225 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/EchoFilterTest.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/EchoFilterTest.java
@@ -22,6 +22,7 @@ import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.support.DemoService;
import org.junit.jupiter.api.Test;
@@ -37,7 +38,7 @@ public class EchoFilterTest {
@SuppressWarnings("unchecked")
@Test
public void testEcho() {
- Invocation invocation = mock(Invocation.class);
+ Invocation invocation = mock(RpcInvocation.class);
given(invocation.getMethodName()).willReturn("$echo");
given(invocation.getParameterTypes()).willReturn(new Class<?>[]{Enum.class});
given(invocation.getArguments()).willReturn(new Object[]{"hello"});
diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/TimeoutFilterTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/TimeoutFilterTest.java
index be85365..2564273 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/TimeoutFilterTest.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/TimeoutFilterTest.java
@@ -21,6 +21,7 @@ import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.support.BlockMyInvoker;
import org.junit.jupiter.api.Assertions;
@@ -56,7 +57,7 @@ public class TimeoutFilterTest {
URL url = URL.valueOf("test://test:11/test?accesslog=true&group=dubbo&version=1.1&timeout=" + timeout);
Invoker invoker = new BlockMyInvoker(url, (timeout + 100));
- Invocation invocation = Mockito.mock(Invocation.class);
+ Invocation invocation = Mockito.mock(RpcInvocation.class);
when(invocation.getMethodName()).thenReturn("testInvokeWithTimeout");
Result result = timeoutFilter.invoke(invoker, invocation);
diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/MockInvocation.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/MockInvocation.java
index 2c8a618..ae8a69c 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/MockInvocation.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/MockInvocation.java
@@ -17,8 +17,8 @@
package org.apache.dubbo.rpc.support;
import org.apache.dubbo.rpc.AttachmentsAdapter;
-import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.model.ServiceModel;
import java.util.HashMap;
@@ -34,7 +34,7 @@ import static org.apache.dubbo.rpc.Constants.TOKEN_KEY;
/**
* MockInvocation.java
*/
-public class MockInvocation implements Invocation {
+public class MockInvocation extends RpcInvocation {
private Map<String, Object> attachments;