You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2019/04/24 03:02:58 UTC
[incubator-dubbo] branch 3.x-dev updated: Result implements future
and keep Filter backward compatibility. (#3916)
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.x-dev
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git
The following commit(s) were added to refs/heads/3.x-dev by this push:
new ed8a36b Result implements future and keep Filter backward compatibility. (#3916)
ed8a36b is described below
commit ed8a36bf695537040acd4bd5ed17bebab3ff5a92
Author: ken.lj <ke...@gmail.com>
AuthorDate: Wed Apr 24 11:02:49 2019 +0800
Result implements future and keep Filter backward compatibility. (#3916)
---
.../main/java/com/alibaba/dubbo/rpc/Result.java | 34 ++++---
.../java/org/apache/dubbo/rpc/AbstractResult.java | 25 +++++
.../java/org/apache/dubbo/rpc/AppResponse.java | 26 ++----
.../java/org/apache/dubbo/rpc/AsyncRpcResult.java | 102 ++++++++++++---------
.../src/main/java/org/apache/dubbo/rpc/Filter.java | 13 +++
.../src/main/java/org/apache/dubbo/rpc/Result.java | 20 ++--
.../dubbo/rpc/protocol/ProtocolFilterWrapper.java | 2 +
.../dubbo/rpc/proxy/AbstractProxyInvoker.java | 7 +-
.../dubbo/rpc/filter/ExceptionFilterTest.java | 5 +-
.../dubbo/rpc/filter/GenericImplFilterTest.java | 5 +-
.../org/apache/dubbo/rpc/support/MyInvoker.java | 4 +-
.../rpc/protocol/dubbo/ChannelWrappedInvoker.java | 12 ++-
.../dubbo/rpc/protocol/dubbo/DubboInvoker.java | 14 ++-
.../dubbo/rpc/protocol/dubbo/DubboProtocol.java | 2 +-
.../dubbo/rpc/protocol/dubbo/FutureAdapter.java | 14 +--
.../dubbo/rpc/protocol/thrift/ThriftInvoker.java | 9 +-
16 files changed, 180 insertions(+), 114 deletions(-)
diff --git a/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/Result.java b/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/Result.java
index 2253130..07f5df9 100644
--- a/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/Result.java
+++ b/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/Result.java
@@ -17,11 +17,7 @@
package com.alibaba.dubbo.rpc;
-import org.apache.dubbo.rpc.AppResponse;
-
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.function.Function;
@Deprecated
@@ -37,23 +33,15 @@ public interface Result extends org.apache.dubbo.rpc.Result {
}
- @Override
- default org.apache.dubbo.rpc.Result thenApplyWithContext(Function<AppResponse, AppResponse> fn) {
- return this;
- }
-
- @Override
- default <U> CompletableFuture<U> thenApply(Function<org.apache.dubbo.rpc.Result, ? extends U> fn) {
- return null;
- }
+ abstract class AbstractResult extends org.apache.dubbo.rpc.AbstractResult implements Result {
- @Override
- default org.apache.dubbo.rpc.Result get() throws InterruptedException, ExecutionException {
- return this;
+ @Override
+ public org.apache.dubbo.rpc.Result thenApplyWithContext(Function<org.apache.dubbo.rpc.Result, org.apache.dubbo.rpc.Result> fn) {
+ return null;
+ }
}
-
- class CompatibleResult implements Result {
+ class CompatibleResult extends AbstractResult {
private org.apache.dubbo.rpc.Result delegate;
public CompatibleResult(org.apache.dubbo.rpc.Result result) {
@@ -70,11 +58,21 @@ public interface Result extends org.apache.dubbo.rpc.Result {
}
@Override
+ public void setValue(Object value) {
+ delegate.setValue(value);
+ }
+
+ @Override
public Throwable getException() {
return delegate.getException();
}
@Override
+ public void setException(Throwable t) {
+ delegate.setException(t);
+ }
+
+ @Override
public boolean hasException() {
return delegate.hasException();
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AbstractResult.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AbstractResult.java
new file mode 100644
index 0000000..2db743d
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AbstractResult.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ *
+ */
+public abstract class AbstractResult extends CompletableFuture<Result> implements Result {
+}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java
index 6ede9d4..16e9380 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java
@@ -19,8 +19,6 @@ package org.apache.dubbo.rpc;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.function.Function;
/**
@@ -30,21 +28,19 @@ import java.util.function.Function;
* <li>AppResponse only simply represents the business result</li>
* </ul>
*
- * The relationship between them can be reflected in the definition of AsyncRpcResult:
+ * The relationship between them can be described as follow, an abstraction of the definition of AsyncRpcResult:
* <pre>
* {@code
- * Public class AsyncRpcResult implements Result {
- * private CompletableFuture <AppResponse> resultFuture;
+ * Public class AsyncRpcResult implements CompletionStage<AppResponse> {
* ......
- * }
* }
* </pre>
- *
- * In theory, AppResponse does not need to implement the {@link Result} interface, this is done mainly for compatibility purpose.
+ * AsyncRpcResult is a future representing an unfinished RPC call, while AppResponse is the actual return type of this call.
+ * In theory, AppResponse does'n have to implement the {@link Result} interface, this is done mainly for compatibility purpose.
*
* @serial Do not change the class name and properties.
*/
-public class AppResponse implements Result, Serializable {
+public class AppResponse extends AbstractResult implements Serializable {
private static final long serialVersionUID = -6925924956850004727L;
@@ -139,17 +135,7 @@ public class AppResponse implements Result, Serializable {
}
@Override
- public Result thenApplyWithContext(Function<AppResponse, AppResponse> fn) {
- throw new UnsupportedOperationException("AppResponse represents an concrete business response, there will be no status changes, you should get internal values directly.");
- }
-
- @Override
- public <U> CompletableFuture<U> thenApply(Function<Result, ? extends U> fn) {
- throw new UnsupportedOperationException("AppResponse represents an concrete business response, there will be no status changes, you should get internal values directly.");
- }
-
- @Override
- public Result get() throws InterruptedException, ExecutionException {
+ public Result thenApplyWithContext(Function<Result, Result> fn) {
throw new UnsupportedOperationException("AppResponse represents an concrete business response, there will be no status changes, you should get internal values directly.");
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
index 789cbe7..e15053a 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
@@ -22,10 +22,9 @@ import org.apache.dubbo.common.logger.LoggerFactory;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
import java.util.function.Function;
-public class AsyncRpcResult implements Result {
+public class AsyncRpcResult extends AbstractResult {
private static final Logger logger = LoggerFactory.getLogger(AsyncRpcResult.class);
/**
@@ -35,16 +34,25 @@ public class AsyncRpcResult implements Result {
private RpcContext storedContext;
private RpcContext storedServerContext;
- private CompletableFuture<AppResponse> responseFuture;
private Invocation invocation;
- public AsyncRpcResult(CompletableFuture<AppResponse> future, Invocation invocation) {
- this.responseFuture = future;
+ public AsyncRpcResult(Invocation invocation) {
this.invocation = invocation;
this.storedContext = RpcContext.getContext();
this.storedServerContext = RpcContext.getServerContext();
}
+ public AsyncRpcResult(AsyncRpcResult asyncRpcResult) {
+ this.invocation = asyncRpcResult.getInvocation();
+ this.storedContext = asyncRpcResult.getStoredContext();
+ this.storedServerContext = asyncRpcResult.getStoredServerContext();
+ }
+
+ /**
+ * Notice the return type of {@link #getValue} is the actual type of the RPC method, not {@link AppResponse}
+ *
+ * @return
+ */
@Override
public Object getValue() {
return getAppResponse().getValue();
@@ -52,7 +60,9 @@ public class AsyncRpcResult implements Result {
@Override
public void setValue(Object value) {
-
+ AppResponse appResponse = new AppResponse();
+ appResponse.setValue(value);
+ this.complete(appResponse);
}
@Override
@@ -62,7 +72,9 @@ public class AsyncRpcResult implements Result {
@Override
public void setException(Throwable t) {
-
+ AppResponse appResponse = new AppResponse();
+ appResponse.setException(t);
+ this.complete(appResponse);
}
@Override
@@ -70,18 +82,10 @@ public class AsyncRpcResult implements Result {
return getAppResponse().hasException();
}
- public CompletableFuture<AppResponse> getResponseFuture() {
- return responseFuture;
- }
-
- public void setResponseFuture(CompletableFuture<AppResponse> responseFuture) {
- this.responseFuture = responseFuture;
- }
-
public Result getAppResponse() {
try {
- if (responseFuture.isDone()) {
- return responseFuture.get();
+ if (this.isDone()) {
+ return this.get();
}
} catch (Exception e) {
// This should never happen;
@@ -97,7 +101,7 @@ public class AsyncRpcResult implements Result {
AppResponse appResponse = new AppResponse();
CompletableFuture<Object> future = new CompletableFuture<>();
appResponse.setValue(future);
- responseFuture.whenComplete((result, t) -> {
+ this.whenComplete((result, t) -> {
if (t != null) {
if (t instanceof CompletionException) {
t = t.getCause();
@@ -112,25 +116,27 @@ public class AsyncRpcResult implements Result {
}
});
return appResponse.recreate();
- } else if (responseFuture.isDone()) {
- return responseFuture.get().recreate();
+ } else if (this.isDone()) {
+ return this.get().recreate();
}
return (new AppResponse()).recreate();
}
- public Result get() throws InterruptedException, ExecutionException {
- return responseFuture.get();
+ public Result thenApplyWithContext(Function<Result, Result> fn) {
+ CompletableFuture<Result> future = this.thenApply(fn.compose(beforeContext).andThen(afterContext));
+ AsyncRpcResult nextAsyncRpcResult = new AsyncRpcResult(this);
+ nextAsyncRpcResult.subscribeTo(future);
+ return nextAsyncRpcResult;
}
- @Override
- public Result thenApplyWithContext(Function<AppResponse, AppResponse> fn) {
- this.responseFuture = responseFuture.thenApply(fn.compose(beforeContext).andThen(afterContext));
- return this;
- }
-
- @Override
- public <U> CompletableFuture<U> thenApply(Function<Result,? extends U> fn) {
- return this.responseFuture.thenApply(fn);
+ public void subscribeTo(CompletableFuture<?> future) {
+ future.whenComplete((obj, t) -> {
+ if (t != null) {
+ this.completeExceptionally(t);
+ } else {
+ this.complete((Result) obj);
+ }
+ });
}
@Override
@@ -163,13 +169,25 @@ public class AsyncRpcResult implements Result {
getAppResponse().setAttachment(key, value);
}
+ public RpcContext getStoredContext() {
+ return storedContext;
+ }
+
+ public RpcContext getStoredServerContext() {
+ return storedServerContext;
+ }
+
+ public Invocation getInvocation() {
+ return invocation;
+ }
+
/**
* tmp context to use when the thread switch to Dubbo thread.
*/
private RpcContext tmpContext;
private RpcContext tmpServerContext;
- private Function<AppResponse, AppResponse> beforeContext = (appResponse) -> {
+ private Function<Result, Result> beforeContext = (appResponse) -> {
tmpContext = RpcContext.getContext();
tmpServerContext = RpcContext.getServerContext();
RpcContext.restoreContext(storedContext);
@@ -177,7 +195,7 @@ public class AsyncRpcResult implements Result {
return appResponse;
};
- private Function<AppResponse, AppResponse> afterContext = (appResponse) -> {
+ private Function<Result, Result> afterContext = (appResponse) -> {
RpcContext.restoreContext(tmpContext);
RpcContext.restoreServerContext(tmpServerContext);
return appResponse;
@@ -186,8 +204,10 @@ public class AsyncRpcResult implements Result {
/**
* Some utility methods used to quickly generate default AsyncRpcResult instance.
*/
- public static AsyncRpcResult newDefaultAsyncResult(AppResponse result, Invocation invocation) {
- return new AsyncRpcResult(CompletableFuture.completedFuture(result), invocation);
+ public static AsyncRpcResult newDefaultAsyncResult(AppResponse appResponse, Invocation invocation) {
+ AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
+ asyncRpcResult.complete(appResponse);
+ return asyncRpcResult;
}
public static AsyncRpcResult newDefaultAsyncResult(Invocation invocation) {
@@ -203,15 +223,15 @@ public class AsyncRpcResult implements Result {
}
public static AsyncRpcResult newDefaultAsyncResult(Object value, Throwable t, Invocation invocation) {
- CompletableFuture<AppResponse> future = new CompletableFuture<>();
- AppResponse result = new AppResponse();
+ AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
+ AppResponse appResponse = new AppResponse();
if (t != null) {
- result.setException(t);
+ appResponse.setException(t);
} else {
- result.setValue(value);
+ appResponse.setValue(value);
}
- future.complete(result);
- return new AsyncRpcResult(future, invocation);
+ asyncRpcResult.complete(appResponse);
+ return asyncRpcResult;
}
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java
index 3445488..53ad128 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java
@@ -47,6 +47,19 @@ public interface Filter {
*/
Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;
+ /**
+ * Filter itself should only be response for passing invocation, all callbacks has been placed into {@link Listener}
+ *
+ * @param appResponse
+ * @param invoker
+ * @param invocation
+ * @return
+ */
+ @Deprecated
+ default Result onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
+ return appResponse;
+ }
+
interface Listener {
void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation);
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Result.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Result.java
index 1ecd9da..9c0c1ab 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Result.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Result.java
@@ -19,7 +19,8 @@ package org.apache.dubbo.rpc;
import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Future;
import java.util.function.Function;
@@ -30,7 +31,7 @@ import java.util.function.Function;
* @see org.apache.dubbo.rpc.Invoker#invoke(Invocation)
* @see AppResponse
*/
-public interface Result extends Serializable {
+public interface Result extends CompletionStage<Result>, Future<Result>, Serializable {
/**
* Get invoke result.
@@ -110,10 +111,17 @@ public interface Result extends Serializable {
void setAttachment(String key, String value);
- Result thenApplyWithContext(Function<AppResponse, AppResponse> fn);
-
- <U> CompletableFuture<U> thenApply(Function<Result, ? extends U> fn);
+ /**
+ * Returns the specified {@code valueIfAbsent} when not complete, or
+ * returns the result value or throws an exception when complete.
+ *
+ * @see CompletableFuture#getNow(Object)
+ */
+ Result getNow(Result valueIfAbsent);
- Result get() throws InterruptedException, ExecutionException;
+ Result thenApplyWithContext(Function<Result, Result> fn);
+ default CompletableFuture<Result> completionFuture() {
+ return toCompletableFuture();
+ }
}
\ No newline at end of file
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java
index 2106a4e..0ca8ced 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java
@@ -90,6 +90,8 @@ public class ProtocolFilterWrapper implements Protocol {
if (listener != null) {
listener.onResponse(r, invoker, invocation);
}
+ } else {
+ filter.onResponse(r, invoker, invocation);
}
return r;
});
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
index aa813e1..fe42fb2 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
@@ -84,7 +84,8 @@ public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
try {
Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
CompletableFuture<Object> future = wrapWithFuture(value, invocation);
- CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
+ AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
+ future.whenComplete((obj, t) -> {
AppResponse result = new AppResponse();
if (t != null) {
if (t instanceof CompletionException) {
@@ -95,9 +96,9 @@ public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
} else {
result.setValue(obj);
}
- return result;
+ asyncRpcResult.complete(result);
});
- return new AsyncRpcResult(appResponseFuture, invocation);
+ return asyncRpcResult;
} catch (InvocationTargetException e) {
if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ExceptionFilterTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ExceptionFilterTest.java
index 49af0c0..bc9c132 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ExceptionFilterTest.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ExceptionFilterTest.java
@@ -28,12 +28,11 @@ import org.apache.dubbo.rpc.support.DemoService;
import org.apache.dubbo.rpc.support.LocalException;
import com.alibaba.com.caucho.hessian.HessianException;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
-import java.util.concurrent.CompletableFuture;
-
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
@@ -121,7 +120,7 @@ public class ExceptionFilterTest {
AppResponse mockRpcResult = new AppResponse();
mockRpcResult.setException(new HessianException("hessian"));
- Result mockAsyncResult = new AsyncRpcResult(CompletableFuture.completedFuture(mockRpcResult), invocation);
+ Result mockAsyncResult = AsyncRpcResult.newDefaultAsyncResult(mockRpcResult, invocation);
Invoker<DemoService> invoker = mock(Invoker.class);
diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/GenericImplFilterTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/GenericImplFilterTest.java
index 469632b..aa39279 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/GenericImplFilterTest.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/GenericImplFilterTest.java
@@ -36,7 +36,6 @@ import org.mockito.Mockito;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.when;
@@ -61,7 +60,7 @@ public class GenericImplFilterTest {
person.put("age", 10);
AppResponse mockRpcResult = new AppResponse(person);
- when(invoker.invoke(any(Invocation.class))).thenReturn(new AsyncRpcResult(CompletableFuture.completedFuture(mockRpcResult), invocation));
+ when(invoker.invoke(any(Invocation.class))).thenReturn(AsyncRpcResult.newDefaultAsyncResult(mockRpcResult, invocation));
when(invoker.getUrl()).thenReturn(url);
when(invoker.getInterface()).thenReturn(DemoService.class);
@@ -84,7 +83,7 @@ public class GenericImplFilterTest {
Invoker invoker = Mockito.mock(Invoker.class);
AppResponse mockRpcResult = new AppResponse(new GenericException(new RuntimeException("failed")));
- when(invoker.invoke(any(Invocation.class))).thenReturn(new AsyncRpcResult(CompletableFuture.completedFuture(mockRpcResult), invocation));
+ when(invoker.invoke(any(Invocation.class))).thenReturn(AsyncRpcResult.newDefaultAsyncResult(mockRpcResult, invocation));
when(invoker.getUrl()).thenReturn(url);
when(invoker.getInterface()).thenReturn(DemoService.class);
diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/MyInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/MyInvoker.java
index 1b74e6b..57f1f1e 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/MyInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/MyInvoker.java
@@ -24,8 +24,6 @@ import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
-import java.util.concurrent.CompletableFuture;
-
/**
* MockInvoker.java
*/
@@ -68,7 +66,7 @@ public class MyInvoker<T> implements Invoker<T> {
result.setException(new RuntimeException("mocked exception"));
}
- return new AsyncRpcResult(CompletableFuture.completedFuture(result), invocation);
+ return AsyncRpcResult.newDefaultAsyncResult(result, invocation);
}
@Override
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java
index 5a22127..17b2cca 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java
@@ -66,8 +66,16 @@ class ChannelWrappedInvoker<T> extends AbstractInvoker<T> {
currentClient.send(inv, getUrl().getMethodParameter(invocation.getMethodName(), Constants.SENT_KEY, false));
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
- CompletableFuture<AppResponse> appResponseFuture = currentClient.request(inv).thenApply(obj -> (AppResponse) obj);
- return new AsyncRpcResult(appResponseFuture, inv);
+ CompletableFuture<Object> responseFuture = currentClient.request(inv);
+ AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
+ responseFuture.whenComplete((appResponse, t) -> {
+ if (t != null) {
+ asyncRpcResult.completeExceptionally(t);
+ } else {
+ asyncRpcResult.complete((AppResponse) appResponse);
+ }
+ });
+ return asyncRpcResult;
}
} catch (RpcException e) {
throw e;
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
index d8513a3..0f77d65 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
@@ -87,9 +87,17 @@ public class DubboInvoker<T> extends AbstractInvoker<T> {
RpcContext.getContext().setFuture(null);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
- CompletableFuture<AppResponse> appResponseFuture = currentClient.request(inv, timeout).thenApply(obj -> (AppResponse) obj);
- RpcContext.getContext().setFuture(new FutureAdapter(appResponseFuture));
- return new AsyncRpcResult(appResponseFuture, inv);
+ AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
+ CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
+ responseFuture.whenComplete((obj, t) -> {
+ if (t != null) {
+ asyncRpcResult.completeExceptionally(t);
+ } else {
+ asyncRpcResult.complete((AppResponse) obj);
+ }
+ });
+ RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
+ return asyncRpcResult;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
index 926a940..15b0bd2 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
@@ -125,7 +125,7 @@ public class DubboProtocol extends AbstractProtocol {
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
Result result = invoker.invoke(inv);
- return result.thenApply(Function.identity());
+ return result.completionFuture().thenApply(Function.identity());
}
@Override
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/FutureAdapter.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/FutureAdapter.java
index 9fc47f6..d00c5fa 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/FutureAdapter.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/FutureAdapter.java
@@ -16,7 +16,7 @@
*/
package org.apache.dubbo.rpc.protocol.dubbo;
-import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.RpcException;
import java.util.concurrent.CompletableFuture;
@@ -30,21 +30,21 @@ import java.util.concurrent.TimeoutException;
*/
public class FutureAdapter<V> extends CompletableFuture<V> {
- private CompletableFuture<Result> resultFuture;
+ private CompletableFuture<AppResponse> resultFuture;
- public FutureAdapter(CompletableFuture<Result> future) {
+ public FutureAdapter(CompletableFuture<AppResponse> future) {
this.resultFuture = future;
- future.whenComplete((result, t) -> {
+ future.whenComplete((appResponse, t) -> {
if (t != null) {
if (t instanceof CompletionException) {
t = t.getCause();
}
this.completeExceptionally(t);
} else {
- if (result.hasException()) {
- this.completeExceptionally(result.getException());
+ if (appResponse.hasException()) {
+ this.completeExceptionally(appResponse.getException());
} else {
- this.complete((V)result.getValue());
+ this.complete((V) appResponse.getValue());
}
}
});
diff --git a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java
index e02fac3..6eb3c6a 100644
--- a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java
@@ -22,7 +22,6 @@ import org.apache.dubbo.common.utils.AtomicPositiveInteger;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.TimeoutException;
import org.apache.dubbo.remoting.exchange.ExchangeClient;
-import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
@@ -89,9 +88,11 @@ public class ThriftInvoker<T> extends AbstractInvoker<T> {
int timeout = getUrl().getMethodParameter(
methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
- CompletableFuture<AppResponse> appResponseFuture = currentClient.request(inv, timeout).thenApply(obj -> (AppResponse) obj);
- RpcContext.getContext().setFuture(new FutureAdapter(appResponseFuture));
- return new AsyncRpcResult(appResponseFuture, invocation);
+ AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
+ CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
+ asyncRpcResult.subscribeTo(responseFuture);
+ RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
+ return asyncRpcResult;
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, e.getMessage(), e);
} catch (RemotingException e) {