You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2019/04/24 03:02:58 UTC

[incubator-dubbo] branch 3.x-dev updated: Result implements future and keep Filter backward compatibility. (#3916)

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 3.x-dev
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git


The following commit(s) were added to refs/heads/3.x-dev by this push:
     new ed8a36b  Result implements future and keep Filter backward compatibility. (#3916)
ed8a36b is described below

commit ed8a36bf695537040acd4bd5ed17bebab3ff5a92
Author: ken.lj <ke...@gmail.com>
AuthorDate: Wed Apr 24 11:02:49 2019 +0800

    Result implements future and keep Filter backward compatibility. (#3916)
---
 .../main/java/com/alibaba/dubbo/rpc/Result.java    |  34 ++++---
 .../java/org/apache/dubbo/rpc/AbstractResult.java  |  25 +++++
 .../java/org/apache/dubbo/rpc/AppResponse.java     |  26 ++----
 .../java/org/apache/dubbo/rpc/AsyncRpcResult.java  | 102 ++++++++++++---------
 .../src/main/java/org/apache/dubbo/rpc/Filter.java |  13 +++
 .../src/main/java/org/apache/dubbo/rpc/Result.java |  20 ++--
 .../dubbo/rpc/protocol/ProtocolFilterWrapper.java  |   2 +
 .../dubbo/rpc/proxy/AbstractProxyInvoker.java      |   7 +-
 .../dubbo/rpc/filter/ExceptionFilterTest.java      |   5 +-
 .../dubbo/rpc/filter/GenericImplFilterTest.java    |   5 +-
 .../org/apache/dubbo/rpc/support/MyInvoker.java    |   4 +-
 .../rpc/protocol/dubbo/ChannelWrappedInvoker.java  |  12 ++-
 .../dubbo/rpc/protocol/dubbo/DubboInvoker.java     |  14 ++-
 .../dubbo/rpc/protocol/dubbo/DubboProtocol.java    |   2 +-
 .../dubbo/rpc/protocol/dubbo/FutureAdapter.java    |  14 +--
 .../dubbo/rpc/protocol/thrift/ThriftInvoker.java   |   9 +-
 16 files changed, 180 insertions(+), 114 deletions(-)

diff --git a/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/Result.java b/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/Result.java
index 2253130..07f5df9 100644
--- a/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/Result.java
+++ b/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/Result.java
@@ -17,11 +17,7 @@
 
 package com.alibaba.dubbo.rpc;
 
-import org.apache.dubbo.rpc.AppResponse;
-
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 
 @Deprecated
@@ -37,23 +33,15 @@ public interface Result extends org.apache.dubbo.rpc.Result {
 
     }
 
-    @Override
-    default org.apache.dubbo.rpc.Result thenApplyWithContext(Function<AppResponse, AppResponse> fn) {
-        return this;
-    }
-
-    @Override
-    default <U> CompletableFuture<U> thenApply(Function<org.apache.dubbo.rpc.Result, ? extends U> fn) {
-        return null;
-    }
+    abstract class AbstractResult extends org.apache.dubbo.rpc.AbstractResult implements Result {
 
-    @Override
-    default org.apache.dubbo.rpc.Result get() throws InterruptedException, ExecutionException {
-        return this;
+        @Override
+        public org.apache.dubbo.rpc.Result thenApplyWithContext(Function<org.apache.dubbo.rpc.Result, org.apache.dubbo.rpc.Result> fn) {
+            return null;
+        }
     }
 
-
-    class CompatibleResult implements Result {
+    class CompatibleResult extends AbstractResult {
         private org.apache.dubbo.rpc.Result delegate;
 
         public CompatibleResult(org.apache.dubbo.rpc.Result result) {
@@ -70,11 +58,21 @@ public interface Result extends org.apache.dubbo.rpc.Result {
         }
 
         @Override
+        public void setValue(Object value) {
+            delegate.setValue(value);
+        }
+
+        @Override
         public Throwable getException() {
             return delegate.getException();
         }
 
         @Override
+        public void setException(Throwable t) {
+            delegate.setException(t);
+        }
+
+        @Override
         public boolean hasException() {
             return delegate.hasException();
         }
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AbstractResult.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AbstractResult.java
new file mode 100644
index 0000000..2db743d
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AbstractResult.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ *
+ */
+public abstract class AbstractResult extends CompletableFuture<Result> implements Result {
+}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java
index 6ede9d4..16e9380 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java
@@ -19,8 +19,6 @@ package org.apache.dubbo.rpc;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 
 /**
@@ -30,21 +28,19 @@ import java.util.function.Function;
  *     <li>AppResponse only simply represents the business result</li>
  * </ul>
  *
- *  The relationship between them can be reflected in the definition of AsyncRpcResult:
+ *  The relationship between them can be described as follow, an abstraction of the definition of AsyncRpcResult:
  *  <pre>
  *  {@code
- *   Public class AsyncRpcResult implements Result {
- *      private CompletableFuture <AppResponse> resultFuture;
+ *   Public class AsyncRpcResult implements CompletionStage<AppResponse> {
  *       ......
- *   }
  *  }
  * </pre>
- *
- * In theory, AppResponse does not need to implement the {@link Result} interface, this is done mainly for compatibility purpose.
+ * AsyncRpcResult is a future representing an unfinished RPC call, while AppResponse is the actual return type of this call.
+ * In theory, AppResponse does'n have to implement the {@link Result} interface, this is done mainly for compatibility purpose.
  *
  * @serial Do not change the class name and properties.
  */
-public class AppResponse implements Result, Serializable {
+public class AppResponse extends AbstractResult implements Serializable {
 
     private static final long serialVersionUID = -6925924956850004727L;
 
@@ -139,17 +135,7 @@ public class AppResponse implements Result, Serializable {
     }
 
     @Override
-    public Result thenApplyWithContext(Function<AppResponse, AppResponse> fn) {
-        throw new UnsupportedOperationException("AppResponse represents an concrete business response, there will be no status changes, you should get internal values directly.");
-    }
-
-    @Override
-    public <U> CompletableFuture<U> thenApply(Function<Result, ? extends U> fn) {
-        throw new UnsupportedOperationException("AppResponse represents an concrete business response, there will be no status changes, you should get internal values directly.");
-    }
-
-    @Override
-    public Result get() throws InterruptedException, ExecutionException {
+    public Result thenApplyWithContext(Function<Result, Result> fn) {
         throw new UnsupportedOperationException("AppResponse represents an concrete business response, there will be no status changes, you should get internal values directly.");
     }
 
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
index 789cbe7..e15053a 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
@@ -22,10 +22,9 @@ import org.apache.dubbo.common.logger.LoggerFactory;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 
-public class AsyncRpcResult implements Result {
+public class AsyncRpcResult extends AbstractResult {
     private static final Logger logger = LoggerFactory.getLogger(AsyncRpcResult.class);
 
     /**
@@ -35,16 +34,25 @@ public class AsyncRpcResult implements Result {
     private RpcContext storedContext;
     private RpcContext storedServerContext;
 
-    private CompletableFuture<AppResponse> responseFuture;
     private Invocation invocation;
 
-    public AsyncRpcResult(CompletableFuture<AppResponse> future, Invocation invocation) {
-        this.responseFuture = future;
+    public AsyncRpcResult(Invocation invocation) {
         this.invocation = invocation;
         this.storedContext = RpcContext.getContext();
         this.storedServerContext = RpcContext.getServerContext();
     }
 
+    public AsyncRpcResult(AsyncRpcResult asyncRpcResult) {
+        this.invocation = asyncRpcResult.getInvocation();
+        this.storedContext = asyncRpcResult.getStoredContext();
+        this.storedServerContext = asyncRpcResult.getStoredServerContext();
+    }
+
+    /**
+     * Notice the return type of {@link #getValue} is the actual type of the RPC method, not {@link AppResponse}
+     *
+     * @return
+     */
     @Override
     public Object getValue() {
         return getAppResponse().getValue();
@@ -52,7 +60,9 @@ public class AsyncRpcResult implements Result {
 
     @Override
     public void setValue(Object value) {
-
+        AppResponse appResponse = new AppResponse();
+        appResponse.setValue(value);
+        this.complete(appResponse);
     }
 
     @Override
@@ -62,7 +72,9 @@ public class AsyncRpcResult implements Result {
 
     @Override
     public void setException(Throwable t) {
-
+        AppResponse appResponse = new AppResponse();
+        appResponse.setException(t);
+        this.complete(appResponse);
     }
 
     @Override
@@ -70,18 +82,10 @@ public class AsyncRpcResult implements Result {
         return getAppResponse().hasException();
     }
 
-    public CompletableFuture<AppResponse> getResponseFuture() {
-        return responseFuture;
-    }
-
-    public void setResponseFuture(CompletableFuture<AppResponse> responseFuture) {
-        this.responseFuture = responseFuture;
-    }
-
     public Result getAppResponse() {
         try {
-            if (responseFuture.isDone()) {
-                return responseFuture.get();
+            if (this.isDone()) {
+                return this.get();
             }
         } catch (Exception e) {
             // This should never happen;
@@ -97,7 +101,7 @@ public class AsyncRpcResult implements Result {
             AppResponse appResponse = new AppResponse();
             CompletableFuture<Object> future = new CompletableFuture<>();
             appResponse.setValue(future);
-            responseFuture.whenComplete((result, t) -> {
+            this.whenComplete((result, t) -> {
                 if (t != null) {
                     if (t instanceof CompletionException) {
                         t = t.getCause();
@@ -112,25 +116,27 @@ public class AsyncRpcResult implements Result {
                 }
             });
             return appResponse.recreate();
-        } else if (responseFuture.isDone()) {
-            return responseFuture.get().recreate();
+        } else if (this.isDone()) {
+            return this.get().recreate();
         }
         return (new AppResponse()).recreate();
     }
 
-    public Result get() throws InterruptedException, ExecutionException {
-        return responseFuture.get();
+    public Result thenApplyWithContext(Function<Result, Result> fn) {
+        CompletableFuture<Result> future = this.thenApply(fn.compose(beforeContext).andThen(afterContext));
+        AsyncRpcResult nextAsyncRpcResult = new AsyncRpcResult(this);
+        nextAsyncRpcResult.subscribeTo(future);
+        return nextAsyncRpcResult;
     }
 
-    @Override
-    public Result thenApplyWithContext(Function<AppResponse, AppResponse> fn) {
-        this.responseFuture = responseFuture.thenApply(fn.compose(beforeContext).andThen(afterContext));
-        return this;
-    }
-
-    @Override
-    public <U> CompletableFuture<U> thenApply(Function<Result,? extends U> fn) {
-        return this.responseFuture.thenApply(fn);
+    public void subscribeTo(CompletableFuture<?> future) {
+        future.whenComplete((obj, t) -> {
+            if (t != null) {
+                this.completeExceptionally(t);
+            } else {
+                this.complete((Result) obj);
+            }
+        });
     }
 
     @Override
@@ -163,13 +169,25 @@ public class AsyncRpcResult implements Result {
         getAppResponse().setAttachment(key, value);
     }
 
+    public RpcContext getStoredContext() {
+        return storedContext;
+    }
+
+    public RpcContext getStoredServerContext() {
+        return storedServerContext;
+    }
+
+    public Invocation getInvocation() {
+        return invocation;
+    }
+
     /**
      * tmp context to use when the thread switch to Dubbo thread.
      */
     private RpcContext tmpContext;
     private RpcContext tmpServerContext;
 
-    private Function<AppResponse, AppResponse> beforeContext = (appResponse) -> {
+    private Function<Result, Result> beforeContext = (appResponse) -> {
         tmpContext = RpcContext.getContext();
         tmpServerContext = RpcContext.getServerContext();
         RpcContext.restoreContext(storedContext);
@@ -177,7 +195,7 @@ public class AsyncRpcResult implements Result {
         return appResponse;
     };
 
-    private Function<AppResponse, AppResponse> afterContext = (appResponse) -> {
+    private Function<Result, Result> afterContext = (appResponse) -> {
         RpcContext.restoreContext(tmpContext);
         RpcContext.restoreServerContext(tmpServerContext);
         return appResponse;
@@ -186,8 +204,10 @@ public class AsyncRpcResult implements Result {
     /**
      * Some utility methods used to quickly generate default AsyncRpcResult instance.
      */
-    public static AsyncRpcResult newDefaultAsyncResult(AppResponse result, Invocation invocation) {
-        return new AsyncRpcResult(CompletableFuture.completedFuture(result), invocation);
+    public static AsyncRpcResult newDefaultAsyncResult(AppResponse appResponse, Invocation invocation) {
+        AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
+        asyncRpcResult.complete(appResponse);
+        return asyncRpcResult;
     }
 
     public static AsyncRpcResult newDefaultAsyncResult(Invocation invocation) {
@@ -203,15 +223,15 @@ public class AsyncRpcResult implements Result {
     }
 
     public static AsyncRpcResult newDefaultAsyncResult(Object value, Throwable t, Invocation invocation) {
-        CompletableFuture<AppResponse> future = new CompletableFuture<>();
-        AppResponse result = new AppResponse();
+        AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
+        AppResponse appResponse = new AppResponse();
         if (t != null) {
-            result.setException(t);
+            appResponse.setException(t);
         } else {
-            result.setValue(value);
+            appResponse.setValue(value);
         }
-        future.complete(result);
-        return new AsyncRpcResult(future, invocation);
+        asyncRpcResult.complete(appResponse);
+        return asyncRpcResult;
     }
 }
 
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java
index 3445488..53ad128 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java
@@ -47,6 +47,19 @@ public interface Filter {
      */
     Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;
 
+    /**
+     * Filter itself should only be response for passing invocation, all callbacks has been placed into {@link Listener}
+     *
+     * @param appResponse
+     * @param invoker
+     * @param invocation
+     * @return
+     */
+    @Deprecated
+    default Result onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
+        return appResponse;
+    }
+
     interface Listener {
 
         void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation);
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Result.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Result.java
index 1ecd9da..9c0c1ab 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Result.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Result.java
@@ -19,7 +19,8 @@ package org.apache.dubbo.rpc;
 import java.io.Serializable;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Future;
 import java.util.function.Function;
 
 
@@ -30,7 +31,7 @@ import java.util.function.Function;
  * @see org.apache.dubbo.rpc.Invoker#invoke(Invocation)
  * @see AppResponse
  */
-public interface Result extends Serializable {
+public interface Result extends CompletionStage<Result>, Future<Result>, Serializable {
 
     /**
      * Get invoke result.
@@ -110,10 +111,17 @@ public interface Result extends Serializable {
 
     void setAttachment(String key, String value);
 
-    Result thenApplyWithContext(Function<AppResponse, AppResponse> fn);
-
-    <U> CompletableFuture<U> thenApply(Function<Result, ? extends U> fn);
+    /**
+     * Returns the specified {@code valueIfAbsent} when not complete, or
+     * returns the result value or throws an exception when complete.
+     *
+     * @see CompletableFuture#getNow(Object)
+     */
+    Result getNow(Result valueIfAbsent);
 
-    Result get() throws InterruptedException, ExecutionException;
+    Result thenApplyWithContext(Function<Result, Result> fn);
 
+    default CompletableFuture<Result> completionFuture() {
+        return toCompletableFuture();
+    }
 }
\ No newline at end of file
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java
index 2106a4e..0ca8ced 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java
@@ -90,6 +90,8 @@ public class ProtocolFilterWrapper implements Protocol {
                                 if (listener != null) {
                                     listener.onResponse(r, invoker, invocation);
                                 }
+                            } else {
+                                filter.onResponse(r, invoker, invocation);
                             }
                             return r;
                         });
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
index aa813e1..fe42fb2 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
@@ -84,7 +84,8 @@ public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
         try {
             Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
             CompletableFuture<Object> future = wrapWithFuture(value, invocation);
-            CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
+            AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
+            future.whenComplete((obj, t) -> {
                 AppResponse result = new AppResponse();
                 if (t != null) {
                     if (t instanceof CompletionException) {
@@ -95,9 +96,9 @@ public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
                 } else {
                     result.setValue(obj);
                 }
-                return result;
+                asyncRpcResult.complete(result);
             });
-            return new AsyncRpcResult(appResponseFuture, invocation);
+            return asyncRpcResult;
         } catch (InvocationTargetException e) {
             if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
                 logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ExceptionFilterTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ExceptionFilterTest.java
index 49af0c0..bc9c132 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ExceptionFilterTest.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ExceptionFilterTest.java
@@ -28,12 +28,11 @@ import org.apache.dubbo.rpc.support.DemoService;
 import org.apache.dubbo.rpc.support.LocalException;
 
 import com.alibaba.com.caucho.hessian.HessianException;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
-import java.util.concurrent.CompletableFuture;
-
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.BDDMockito.given;
@@ -121,7 +120,7 @@ public class ExceptionFilterTest {
 
         AppResponse mockRpcResult = new AppResponse();
         mockRpcResult.setException(new HessianException("hessian"));
-        Result mockAsyncResult = new AsyncRpcResult(CompletableFuture.completedFuture(mockRpcResult), invocation);
+        Result mockAsyncResult = AsyncRpcResult.newDefaultAsyncResult(mockRpcResult, invocation);
 
 
         Invoker<DemoService> invoker = mock(Invoker.class);
diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/GenericImplFilterTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/GenericImplFilterTest.java
index 469632b..aa39279 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/GenericImplFilterTest.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/GenericImplFilterTest.java
@@ -36,7 +36,6 @@ import org.mockito.Mockito;
 import java.lang.reflect.Method;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.when;
@@ -61,7 +60,7 @@ public class GenericImplFilterTest {
         person.put("age", 10);
 
         AppResponse mockRpcResult = new AppResponse(person);
-        when(invoker.invoke(any(Invocation.class))).thenReturn(new AsyncRpcResult(CompletableFuture.completedFuture(mockRpcResult), invocation));
+        when(invoker.invoke(any(Invocation.class))).thenReturn(AsyncRpcResult.newDefaultAsyncResult(mockRpcResult, invocation));
         when(invoker.getUrl()).thenReturn(url);
         when(invoker.getInterface()).thenReturn(DemoService.class);
 
@@ -84,7 +83,7 @@ public class GenericImplFilterTest {
         Invoker invoker = Mockito.mock(Invoker.class);
 
         AppResponse mockRpcResult = new AppResponse(new GenericException(new RuntimeException("failed")));
-        when(invoker.invoke(any(Invocation.class))).thenReturn(new AsyncRpcResult(CompletableFuture.completedFuture(mockRpcResult), invocation));
+        when(invoker.invoke(any(Invocation.class))).thenReturn(AsyncRpcResult.newDefaultAsyncResult(mockRpcResult, invocation));
         when(invoker.getUrl()).thenReturn(url);
         when(invoker.getInterface()).thenReturn(DemoService.class);
 
diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/MyInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/MyInvoker.java
index 1b74e6b..57f1f1e 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/MyInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/MyInvoker.java
@@ -24,8 +24,6 @@ import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.Result;
 import org.apache.dubbo.rpc.RpcException;
 
-import java.util.concurrent.CompletableFuture;
-
 /**
  * MockInvoker.java
  */
@@ -68,7 +66,7 @@ public class MyInvoker<T> implements Invoker<T> {
             result.setException(new RuntimeException("mocked exception"));
         }
 
-        return new AsyncRpcResult(CompletableFuture.completedFuture(result), invocation);
+        return AsyncRpcResult.newDefaultAsyncResult(result, invocation);
     }
 
     @Override
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java
index 5a22127..17b2cca 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java
@@ -66,8 +66,16 @@ class ChannelWrappedInvoker<T> extends AbstractInvoker<T> {
                 currentClient.send(inv, getUrl().getMethodParameter(invocation.getMethodName(), Constants.SENT_KEY, false));
                 return AsyncRpcResult.newDefaultAsyncResult(invocation);
             } else {
-                CompletableFuture<AppResponse> appResponseFuture = currentClient.request(inv).thenApply(obj -> (AppResponse) obj);
-                return new AsyncRpcResult(appResponseFuture, inv);
+                CompletableFuture<Object> responseFuture = currentClient.request(inv);
+                AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
+                responseFuture.whenComplete((appResponse, t) -> {
+                    if (t != null) {
+                        asyncRpcResult.completeExceptionally(t);
+                    } else {
+                        asyncRpcResult.complete((AppResponse) appResponse);
+                    }
+                });
+                return asyncRpcResult;
             }
         } catch (RpcException e) {
             throw e;
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
index d8513a3..0f77d65 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
@@ -87,9 +87,17 @@ public class DubboInvoker<T> extends AbstractInvoker<T> {
                 RpcContext.getContext().setFuture(null);
                 return AsyncRpcResult.newDefaultAsyncResult(invocation);
             } else {
-                CompletableFuture<AppResponse> appResponseFuture = currentClient.request(inv, timeout).thenApply(obj -> (AppResponse) obj);
-                RpcContext.getContext().setFuture(new FutureAdapter(appResponseFuture));
-                return new AsyncRpcResult(appResponseFuture, inv);
+                AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
+                CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
+                responseFuture.whenComplete((obj, t) -> {
+                    if (t != null) {
+                        asyncRpcResult.completeExceptionally(t);
+                    } else {
+                        asyncRpcResult.complete((AppResponse) obj);
+                    }
+                });
+                RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
+                return asyncRpcResult;
             }
         } catch (TimeoutException e) {
             throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
index 926a940..15b0bd2 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
@@ -125,7 +125,7 @@ public class DubboProtocol extends AbstractProtocol {
             }
             RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
             Result result = invoker.invoke(inv);
-            return result.thenApply(Function.identity());
+            return result.completionFuture().thenApply(Function.identity());
         }
 
         @Override
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/FutureAdapter.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/FutureAdapter.java
index 9fc47f6..d00c5fa 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/FutureAdapter.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/FutureAdapter.java
@@ -16,7 +16,7 @@
  */
 package org.apache.dubbo.rpc.protocol.dubbo;
 
-import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.AppResponse;
 import org.apache.dubbo.rpc.RpcException;
 
 import java.util.concurrent.CompletableFuture;
@@ -30,21 +30,21 @@ import java.util.concurrent.TimeoutException;
  */
 public class FutureAdapter<V> extends CompletableFuture<V> {
 
-    private CompletableFuture<Result> resultFuture;
+    private CompletableFuture<AppResponse> resultFuture;
 
-    public FutureAdapter(CompletableFuture<Result> future) {
+    public FutureAdapter(CompletableFuture<AppResponse> future) {
         this.resultFuture = future;
-        future.whenComplete((result, t) -> {
+        future.whenComplete((appResponse, t) -> {
             if (t != null) {
                 if (t instanceof CompletionException) {
                     t = t.getCause();
                 }
                 this.completeExceptionally(t);
             } else {
-                if (result.hasException()) {
-                    this.completeExceptionally(result.getException());
+                if (appResponse.hasException()) {
+                    this.completeExceptionally(appResponse.getException());
                 } else {
-                    this.complete((V)result.getValue());
+                    this.complete((V) appResponse.getValue());
                 }
             }
         });
diff --git a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java
index e02fac3..6eb3c6a 100644
--- a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java
@@ -22,7 +22,6 @@ import org.apache.dubbo.common.utils.AtomicPositiveInteger;
 import org.apache.dubbo.remoting.RemotingException;
 import org.apache.dubbo.remoting.TimeoutException;
 import org.apache.dubbo.remoting.exchange.ExchangeClient;
-import org.apache.dubbo.rpc.AppResponse;
 import org.apache.dubbo.rpc.AsyncRpcResult;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Invoker;
@@ -89,9 +88,11 @@ public class ThriftInvoker<T> extends AbstractInvoker<T> {
             int timeout = getUrl().getMethodParameter(
                     methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
 
-            CompletableFuture<AppResponse> appResponseFuture = currentClient.request(inv, timeout).thenApply(obj -> (AppResponse) obj);
-            RpcContext.getContext().setFuture(new FutureAdapter(appResponseFuture));
-            return new AsyncRpcResult(appResponseFuture, invocation);
+            AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
+            CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
+            asyncRpcResult.subscribeTo(responseFuture);
+            RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
+            return asyncRpcResult;
         } catch (TimeoutException e) {
             throw new RpcException(RpcException.TIMEOUT_EXCEPTION, e.getMessage(), e);
         } catch (RemotingException e) {