You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2022/05/12 05:57:53 UTC
[dubbo] branch 3.0 updated: [3.0] Support Injvm invoke server async (#10026)
This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new d6e14980a5 [3.0] Support Injvm invoke server async (#10026)
d6e14980a5 is described below
commit d6e14980a5a17bcfebab6190cb0ae31ab8d931c2
Author: Albumen Kevin <jh...@gmail.com>
AuthorDate: Thu May 12 13:57:18 2022 +0800
[3.0] Support Injvm invoke server async (#10026)
* [3.0] Support Injvm invoke server async
* add timeout support
* return async
---
.../java/org/apache/dubbo/rpc/AsyncRpcResult.java | 2 +
.../dubbo/rpc/protocol/injvm/InjvmInvoker.java | 65 +++++++++++++++++-----
2 files changed, 54 insertions(+), 13 deletions(-)
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
index ebf587a3aa..027dcd4892 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
@@ -200,6 +200,8 @@ public class AsyncRpcResult implements Result {
RpcInvocation rpcInvocation = (RpcInvocation) invocation;
if (InvokeMode.FUTURE == rpcInvocation.getInvokeMode()) {
return RpcContext.getClientAttachment().getFuture();
+ } else if (InvokeMode.ASYNC == rpcInvocation.getInvokeMode()) {
+ return createDefaultValue(invocation).recreate();
}
return getAppResponse().recreate();
diff --git a/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java b/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java
index 2d4f734c88..1dcc98806d 100644
--- a/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java
@@ -34,9 +34,11 @@ import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.TimeoutCountDown;
import org.apache.dubbo.rpc.model.MethodDescriptor;
import org.apache.dubbo.rpc.model.ServiceModel;
import org.apache.dubbo.rpc.protocol.AbstractInvoker;
+import org.apache.dubbo.rpc.support.RpcUtils;
import java.lang.reflect.Type;
import java.util.HashMap;
@@ -44,10 +46,14 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
+import static org.apache.dubbo.common.constants.CommonConstants.ENABLE_TIMEOUT_COUNTDOWN_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.LOCALHOST_VALUE;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_ATTACHMENT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.TIME_COUNTDOWN_KEY;
import static org.apache.dubbo.rpc.Constants.ASYNC_KEY;
/**
@@ -99,8 +105,7 @@ public class InjvmInvoker<T> extends AbstractInvoker<T> {
if (serverHasToken) {
invocation.setAttachment(Constants.TOKEN_KEY, serverURL.getParameter(Constants.TOKEN_KEY));
}
-
- invocation.setAttachment(TIMEOUT_KEY, invoker.getUrl().getMethodPositiveParameter(invocation.getMethodName(), TIMEOUT_KEY, DEFAULT_TIMEOUT));
+ invocation.setAttachment(TIMEOUT_KEY, calculateTimeout(invocation, invocation.getMethodName()));
String desc = ReflectUtils.getDesc(invocation.getParameterTypes());
@@ -138,16 +143,35 @@ public class InjvmInvoker<T> extends AbstractInvoker<T> {
} finally {
InternalThreadLocalMap.set(originTL);
}
- if (result.hasException()) {
- AsyncRpcResult rpcResult = AsyncRpcResult.newDefaultAsyncResult(result.getException(), copiedInvocation);
- rpcResult.setObjectAttachments(new HashMap<>(result.getObjectAttachments()));
- return rpcResult;
+ CompletableFuture<AppResponse> future = new CompletableFuture<>();
+ AppResponse rpcResult = new AppResponse(copiedInvocation);
+ if (result instanceof AsyncRpcResult) {
+ result.whenCompleteWithContext((r, t) -> {
+ if (t != null) {
+ rpcResult.setException(t);
+ } else {
+ if (r.hasException()) {
+ rpcResult.setException(r.getException());
+ } else {
+ Object rebuildValue = rebuildValue(invocation, desc, r.getValue());
+ rpcResult.setValue(rebuildValue);
+ }
+ }
+ rpcResult.setObjectAttachments(new HashMap<>(r.getObjectAttachments()));
+ future.complete(rpcResult);
+ });
} else {
- rebuildValue(invocation, desc, result);
- AsyncRpcResult rpcResult = AsyncRpcResult.newDefaultAsyncResult(result.getValue(), copiedInvocation);
+ if (result.hasException()) {
+ rpcResult.setException(result.getException());
+ } else {
+ Object rebuildValue = rebuildValue(invocation, desc, result.getValue());
+ rpcResult.setValue(rebuildValue);
+ }
rpcResult.setObjectAttachments(new HashMap<>(result.getObjectAttachments()));
- return rpcResult;
+ future.complete(rpcResult);
}
+ return new AsyncRpcResult(future, invocation);
+
}
}
@@ -175,7 +199,7 @@ public class InjvmInvoker<T> extends AbstractInvoker<T> {
ServiceModel consumerServiceModel = invocation.getServiceModel();
boolean shouldSkip = shouldIgnoreSameModule && consumerServiceModel != null &&
Objects.equals(providerServiceModel.getModuleModel(), consumerServiceModel.getModuleModel());
- if(CommonConstants.$INVOKE.equals(methodName) || shouldSkip) {
+ if (CommonConstants.$INVOKE.equals(methodName) || shouldSkip) {
// generic invoke, skip copy arguments
RpcInvocation copiedInvocation = new RpcInvocation(invocation.getTargetServiceUniqueName(),
providerServiceModel, methodName, invocation.getServiceName(), invocation.getProtocolServiceKey(),
@@ -222,8 +246,7 @@ public class InjvmInvoker<T> extends AbstractInvoker<T> {
}
}
- private void rebuildValue(Invocation invocation, String desc, Result result) {
- Object originValue = result.getValue();
+ private Object rebuildValue(Invocation invocation, String desc, Object originValue) {
Object value = originValue;
ClassLoader cl = Thread.currentThread().getContextClassLoader();
try {
@@ -235,7 +258,7 @@ public class InjvmInvoker<T> extends AbstractInvoker<T> {
value = paramDeepCopyUtil.copy(getUrl(), originValue, returnType);
}
}
- result.setValue(value);
+ return value;
} finally {
Thread.currentThread().setContextClassLoader(cl);
}
@@ -248,4 +271,20 @@ public class InjvmInvoker<T> extends AbstractInvoker<T> {
return remoteUrl.getParameter(ASYNC_KEY, false);
}
+ private int calculateTimeout(Invocation invocation, String methodName) {
+ Object countdown = RpcContext.getClientAttachment().getObjectAttachment(TIME_COUNTDOWN_KEY);
+ int timeout;
+ if (countdown == null) {
+ timeout = (int) RpcUtils.getTimeout(getUrl(), methodName, RpcContext.getClientAttachment(), DEFAULT_TIMEOUT);
+ if (getUrl().getParameter(ENABLE_TIMEOUT_COUNTDOWN_KEY, false)) {
+ invocation.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY, timeout); // pass timeout to remote server
+ }
+ } else {
+ TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countdown;
+ timeout = (int) timeoutCountDown.timeRemaining(TimeUnit.MILLISECONDS);
+ invocation.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY, timeout);// pass timeout to remote server
+ }
+ return timeout;
+ }
+
}