You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2022/05/12 05:57:53 UTC

[dubbo] branch 3.0 updated: [3.0] Support Injvm invoke server async (#10026)

This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new d6e14980a5 [3.0] Support Injvm invoke server async (#10026)
d6e14980a5 is described below

commit d6e14980a5a17bcfebab6190cb0ae31ab8d931c2
Author: Albumen Kevin <jh...@gmail.com>
AuthorDate: Thu May 12 13:57:18 2022 +0800

    [3.0] Support Injvm invoke server async (#10026)
    
    * [3.0] Support Injvm invoke server async
    
    * add timeout support
    
    * return async
---
 .../java/org/apache/dubbo/rpc/AsyncRpcResult.java  |  2 +
 .../dubbo/rpc/protocol/injvm/InjvmInvoker.java     | 65 +++++++++++++++++-----
 2 files changed, 54 insertions(+), 13 deletions(-)

diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
index ebf587a3aa..027dcd4892 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
@@ -200,6 +200,8 @@ public class AsyncRpcResult implements Result {
         RpcInvocation rpcInvocation = (RpcInvocation) invocation;
         if (InvokeMode.FUTURE == rpcInvocation.getInvokeMode()) {
             return RpcContext.getClientAttachment().getFuture();
+        } else if (InvokeMode.ASYNC == rpcInvocation.getInvokeMode()) {
+            return createDefaultValue(invocation).recreate();
         }
 
         return getAppResponse().recreate();
diff --git a/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java b/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java
index 2d4f734c88..1dcc98806d 100644
--- a/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java
@@ -34,9 +34,11 @@ import org.apache.dubbo.rpc.Result;
 import org.apache.dubbo.rpc.RpcContext;
 import org.apache.dubbo.rpc.RpcException;
 import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.TimeoutCountDown;
 import org.apache.dubbo.rpc.model.MethodDescriptor;
 import org.apache.dubbo.rpc.model.ServiceModel;
 import org.apache.dubbo.rpc.protocol.AbstractInvoker;
+import org.apache.dubbo.rpc.support.RpcUtils;
 
 import java.lang.reflect.Type;
 import java.util.HashMap;
@@ -44,10 +46,14 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
+import static org.apache.dubbo.common.constants.CommonConstants.ENABLE_TIMEOUT_COUNTDOWN_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.LOCALHOST_VALUE;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_ATTACHMENT_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.TIME_COUNTDOWN_KEY;
 import static org.apache.dubbo.rpc.Constants.ASYNC_KEY;
 
 /**
@@ -99,8 +105,7 @@ public class InjvmInvoker<T> extends AbstractInvoker<T> {
         if (serverHasToken) {
             invocation.setAttachment(Constants.TOKEN_KEY, serverURL.getParameter(Constants.TOKEN_KEY));
         }
-
-        invocation.setAttachment(TIMEOUT_KEY, invoker.getUrl().getMethodPositiveParameter(invocation.getMethodName(), TIMEOUT_KEY, DEFAULT_TIMEOUT));
+        invocation.setAttachment(TIMEOUT_KEY, calculateTimeout(invocation, invocation.getMethodName()));
 
         String desc = ReflectUtils.getDesc(invocation.getParameterTypes());
 
@@ -138,16 +143,35 @@ public class InjvmInvoker<T> extends AbstractInvoker<T> {
             } finally {
                 InternalThreadLocalMap.set(originTL);
             }
-            if (result.hasException()) {
-                AsyncRpcResult rpcResult = AsyncRpcResult.newDefaultAsyncResult(result.getException(), copiedInvocation);
-                rpcResult.setObjectAttachments(new HashMap<>(result.getObjectAttachments()));
-                return rpcResult;
+            CompletableFuture<AppResponse> future = new CompletableFuture<>();
+            AppResponse rpcResult = new AppResponse(copiedInvocation);
+            if (result instanceof AsyncRpcResult) {
+                result.whenCompleteWithContext((r, t) -> {
+                    if (t != null) {
+                        rpcResult.setException(t);
+                    } else {
+                        if (r.hasException()) {
+                            rpcResult.setException(r.getException());
+                        } else {
+                            Object rebuildValue = rebuildValue(invocation, desc, r.getValue());
+                            rpcResult.setValue(rebuildValue);
+                        }
+                    }
+                    rpcResult.setObjectAttachments(new HashMap<>(r.getObjectAttachments()));
+                    future.complete(rpcResult);
+                });
             } else {
-                rebuildValue(invocation, desc, result);
-                AsyncRpcResult rpcResult = AsyncRpcResult.newDefaultAsyncResult(result.getValue(), copiedInvocation);
+                if (result.hasException()) {
+                    rpcResult.setException(result.getException());
+                } else {
+                    Object rebuildValue = rebuildValue(invocation, desc, result.getValue());
+                    rpcResult.setValue(rebuildValue);
+                }
                 rpcResult.setObjectAttachments(new HashMap<>(result.getObjectAttachments()));
-                return rpcResult;
+                future.complete(rpcResult);
             }
+            return new AsyncRpcResult(future, invocation);
+
         }
     }
 
@@ -175,7 +199,7 @@ public class InjvmInvoker<T> extends AbstractInvoker<T> {
         ServiceModel consumerServiceModel = invocation.getServiceModel();
         boolean shouldSkip = shouldIgnoreSameModule && consumerServiceModel != null &&
             Objects.equals(providerServiceModel.getModuleModel(), consumerServiceModel.getModuleModel());
-        if(CommonConstants.$INVOKE.equals(methodName) || shouldSkip) {
+        if (CommonConstants.$INVOKE.equals(methodName) || shouldSkip) {
             // generic invoke, skip copy arguments
             RpcInvocation copiedInvocation = new RpcInvocation(invocation.getTargetServiceUniqueName(),
                 providerServiceModel, methodName, invocation.getServiceName(), invocation.getProtocolServiceKey(),
@@ -222,8 +246,7 @@ public class InjvmInvoker<T> extends AbstractInvoker<T> {
         }
     }
 
-    private void rebuildValue(Invocation invocation, String desc, Result result) {
-        Object originValue = result.getValue();
+    private Object rebuildValue(Invocation invocation, String desc, Object originValue) {
         Object value = originValue;
         ClassLoader cl = Thread.currentThread().getContextClassLoader();
         try {
@@ -235,7 +258,7 @@ public class InjvmInvoker<T> extends AbstractInvoker<T> {
                     value = paramDeepCopyUtil.copy(getUrl(), originValue, returnType);
                 }
             }
-            result.setValue(value);
+            return value;
         } finally {
             Thread.currentThread().setContextClassLoader(cl);
         }
@@ -248,4 +271,20 @@ public class InjvmInvoker<T> extends AbstractInvoker<T> {
         return remoteUrl.getParameter(ASYNC_KEY, false);
     }
 
+    private int calculateTimeout(Invocation invocation, String methodName) {
+        Object countdown = RpcContext.getClientAttachment().getObjectAttachment(TIME_COUNTDOWN_KEY);
+        int timeout;
+        if (countdown == null) {
+            timeout = (int) RpcUtils.getTimeout(getUrl(), methodName, RpcContext.getClientAttachment(), DEFAULT_TIMEOUT);
+            if (getUrl().getParameter(ENABLE_TIMEOUT_COUNTDOWN_KEY, false)) {
+                invocation.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY, timeout); // pass timeout to remote server
+            }
+        } else {
+            TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countdown;
+            timeout = (int) timeoutCountDown.timeRemaining(TimeUnit.MILLISECONDS);
+            invocation.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY, timeout);// pass timeout to remote server
+        }
+        return timeout;
+    }
+
 }