You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by vi...@apache.org on 2019/07/05 09:31:08 UTC
[dubbo] branch 2.7.3-release updated: FutureAdapter depends on
AsyncRpcResult (#4471)
This is an automated email from the ASF dual-hosted git repository.
victory pushed a commit to branch 2.7.3-release
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/2.7.3-release by this push:
new e3e5460 FutureAdapter depends on AsyncRpcResult (#4471)
e3e5460 is described below
commit e3e5460e4e2d7d26a4e2b66e280c668ecbfbc86d
Author: ken.lj <ke...@gmail.com>
AuthorDate: Fri Jul 5 17:30:53 2019 +0800
FutureAdapter depends on AsyncRpcResult (#4471)
#4471
---
.../java/com/alibaba/dubbo/rpc/RpcContext.java | 2 +-
.../dubbo/rpc/protocol/dubbo/FutureAdapter.java | 45 +++++++++++-------
.../java/org/apache/dubbo/rpc/AsyncRpcResult.java | 28 +++---------
.../java/org/apache/dubbo/rpc/FutureContext.java | 33 ++++++++++++--
.../main/java/org/apache/dubbo/rpc/RpcContext.java | 7 +--
.../dubbo/rpc/protocol/dubbo/FutureAdapter.java | 0
.../org/apache/dubbo/rpc/FutureContextTest.java | 53 ++++++++++++++++++++++
.../dubbo/rpc/protocol/dubbo/DubboInvoker.java | 6 +--
.../dubbo/rpc/protocol/thrift/ThriftInvoker.java | 6 +--
9 files changed, 126 insertions(+), 54 deletions(-)
diff --git a/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java b/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java
index 010eae8..c21b701 100644
--- a/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java
+++ b/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java
@@ -57,7 +57,7 @@ public class RpcContext extends org.apache.dubbo.rpc.RpcContext {
}
public <T> Future<T> getFuture() {
- CompletableFuture completableFuture = FutureContext.getCompletableFuture();
+ CompletableFuture completableFuture = FutureContext.getContext().getCompatibleCompletableFuture();
if (completableFuture == null) {
return null;
}
diff --git a/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/FutureAdapter.java b/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/FutureAdapter.java
index 58a62a4..427fb7c 100644
--- a/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/FutureAdapter.java
+++ b/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/FutureAdapter.java
@@ -18,10 +18,12 @@
package com.alibaba.dubbo.rpc.protocol.dubbo;
import org.apache.dubbo.rpc.AppResponse;
+import org.apache.dubbo.rpc.Result;
import com.alibaba.dubbo.remoting.RemotingException;
import com.alibaba.dubbo.remoting.exchange.ResponseCallback;
import com.alibaba.dubbo.remoting.exchange.ResponseFuture;
+import com.alibaba.dubbo.rpc.RpcException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -36,11 +38,11 @@ import java.util.function.BiConsumer;
*/
@Deprecated
public class FutureAdapter<V> implements Future<V> {
- private CompletableFuture<V> future;
- public FutureAdapter(CompletableFuture<V> future) {
- this.future = future;
+ private CompletableFuture<Object> future;
+ public FutureAdapter(CompletableFuture<Object> future) {
+ this.future = future;
}
public ResponseFuture getFuture() {
@@ -48,7 +50,7 @@ public class FutureAdapter<V> implements Future<V> {
@Override
public Object get() throws RemotingException {
try {
- return FutureAdapter.this.get();
+ return future.get();
} catch (InterruptedException e) {
throw new RemotingException(e);
} catch (ExecutionException e) {
@@ -59,7 +61,7 @@ public class FutureAdapter<V> implements Future<V> {
@Override
public Object get(int timeoutInMillis) throws RemotingException {
try {
- return FutureAdapter.this.get(timeoutInMillis, TimeUnit.MILLISECONDS);
+ return future.get(timeoutInMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RemotingException(e);
} catch (ExecutionException e) {
@@ -76,26 +78,23 @@ public class FutureAdapter<V> implements Future<V> {
@Override
public boolean isDone() {
- return FutureAdapter.this.isDone();
+ return future.isDone();
}
};
}
void setCallback(ResponseCallback callback) {
- if (!(future instanceof org.apache.dubbo.rpc.protocol.dubbo.FutureAdapter)) {
- return;
- }
- org.apache.dubbo.rpc.protocol.dubbo.FutureAdapter futureAdapter = (org.apache.dubbo.rpc.protocol.dubbo.FutureAdapter) future;
- BiConsumer<AppResponse, ? super Throwable> biConsumer = new BiConsumer<AppResponse, Throwable>() {
+ BiConsumer<Object, ? super Throwable> biConsumer = new BiConsumer<Object, Throwable>() {
@Override
- public void accept(AppResponse appResponse, Throwable t) {
+ public void accept(Object obj, Throwable t) {
if (t != null) {
if (t instanceof CompletionException) {
t = t.getCause();
}
callback.caught(t);
} else {
+ AppResponse appResponse = (AppResponse)obj;
if (appResponse.hasException()) {
callback.caught(appResponse.getException());
} else {
@@ -104,15 +103,15 @@ public class FutureAdapter<V> implements Future<V> {
}
}
};
- futureAdapter.getAppResponseFuture().whenComplete(biConsumer);
+ future.whenComplete(biConsumer);
}
public boolean cancel(boolean mayInterruptIfRunning) {
- return future.cancel(mayInterruptIfRunning);
+ return false;
}
public boolean isCancelled() {
- return future.isCancelled();
+ return false;
}
public boolean isDone() {
@@ -121,11 +120,23 @@ public class FutureAdapter<V> implements Future<V> {
@SuppressWarnings("unchecked")
public V get() throws InterruptedException, ExecutionException {
- return future.get();
+ try {
+ return (V) (((Result) future.get()).recreate());
+ } catch (InterruptedException | ExecutionException e) {
+ throw e;
+ } catch (Throwable e) {
+ throw new RpcException(e);
+ }
}
@SuppressWarnings("unchecked")
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- return future.get(timeout, unit);
+ try {
+ return (V) (((Result) future.get(timeout, unit)).recreate());
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ throw e;
+ } catch (Throwable e) {
+ throw new RpcException(e);
+ }
}
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
index 2714409..ef1adc4 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
@@ -18,10 +18,10 @@ package org.apache.dubbo.rpc;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.rpc.protocol.dubbo.FutureAdapter;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
import java.util.function.BiConsumer;
/**
@@ -139,29 +139,13 @@ public class AsyncRpcResult extends AbstractResult {
@Override
public Object recreate() throws Throwable {
RpcInvocation rpcInvocation = (RpcInvocation) invocation;
+ FutureAdapter future = new FutureAdapter(this);
+ RpcContext.getContext().setFuture(future);
if (InvokeMode.FUTURE == rpcInvocation.getInvokeMode()) {
- AppResponse appResponse = new AppResponse();
- CompletableFuture<Object> future = new CompletableFuture<>();
- appResponse.setValue(future);
- this.whenComplete((result, t) -> {
- if (t != null) {
- if (t instanceof CompletionException) {
- t = t.getCause();
- }
- future.completeExceptionally(t);
- } else {
- if (result.hasException()) {
- future.completeExceptionally(result.getException());
- } else {
- future.complete(result.getValue());
- }
- }
- });
- return appResponse.recreate();
- } else if (this.isDone()) {
- return this.get().recreate();
+ return future;
}
- return (new AppResponse()).recreate();
+
+ return getAppResponse().recreate();
}
@Override
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/FutureContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/FutureContext.java
index 8d5a1b6..2ac608a 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/FutureContext.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/FutureContext.java
@@ -29,7 +29,19 @@ import java.util.concurrent.CompletableFuture;
*/
public class FutureContext {
- public static InternalThreadLocal<CompletableFuture<?>> futureTL = new InternalThreadLocal<>();
+ private static InternalThreadLocal<FutureContext> futureTL = new InternalThreadLocal<FutureContext>() {
+ @Override
+ protected FutureContext initialValue() {
+ return new FutureContext();
+ }
+ };
+
+ public static FutureContext getContext() {
+ return futureTL.get();
+ }
+
+ private CompletableFuture<?> future;
+ private CompletableFuture<?> compatibleFuture;
/**
* get future.
@@ -38,8 +50,8 @@ public class FutureContext {
* @return future
*/
@SuppressWarnings("unchecked")
- public static <T> CompletableFuture<T> getCompletableFuture() {
- return (CompletableFuture<T>) futureTL.get();
+ public <T> CompletableFuture<T> getCompletableFuture() {
+ return (CompletableFuture<T>) future;
}
/**
@@ -47,8 +59,19 @@ public class FutureContext {
*
* @param future
*/
- public static void setFuture(CompletableFuture<?> future) {
- futureTL.set(future);
+ public void setFuture(CompletableFuture<?> future) {
+ this.future = future;
+ }
+
+ @Deprecated
+ @SuppressWarnings("unchecked")
+ public <T> CompletableFuture<T> getCompatibleCompletableFuture() {
+ return (CompletableFuture<T>) compatibleFuture;
+ }
+
+ @Deprecated
+ public void setCompatibleFuture(CompletableFuture<?> compatibleFuture) {
+ this.compatibleFuture = compatibleFuture;
}
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
index 5022170..1066c7d 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
@@ -105,6 +105,7 @@ public class RpcContext {
private Object response;
private AsyncContext asyncContext;
+
protected RpcContext() {
}
@@ -224,7 +225,7 @@ public class RpcContext {
*/
@SuppressWarnings("unchecked")
public <T> CompletableFuture<T> getCompletableFuture() {
- return FutureContext.getCompletableFuture();
+ return FutureContext.getContext().getCompletableFuture();
}
/**
@@ -235,7 +236,7 @@ public class RpcContext {
*/
@SuppressWarnings("unchecked")
public <T> Future<T> getFuture() {
- return FutureContext.getCompletableFuture();
+ return FutureContext.getContext().getCompletableFuture();
}
/**
@@ -244,7 +245,7 @@ public class RpcContext {
* @param future
*/
public void setFuture(CompletableFuture<?> future) {
- FutureContext.setFuture(future);
+ FutureContext.getContext().setFuture(future);
}
public List<URL> getUrls() {
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/FutureAdapter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/FutureAdapter.java
similarity index 100%
rename from dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/FutureAdapter.java
rename to dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/FutureAdapter.java
diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/FutureContextTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/FutureContextTest.java
new file mode 100644
index 0000000..1a907dc
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/FutureContextTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ *
+ */
+public class FutureContextTest {
+
+ @Test
+ public void testFutureContext() throws Exception {
+ Thread thread1 = new Thread(() -> {
+ FutureContext.getContext().setFuture(CompletableFuture.completedFuture("future from thread1"));
+ try {
+ Thread.sleep(500);
+ Assertions.assertEquals("future from thread1", FutureContext.getContext().getCompletableFuture().get());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ thread1.start();
+
+ Thread.sleep(100);
+
+ Thread thread2 = new Thread(() -> {
+ CompletableFuture future = FutureContext.getContext().getCompletableFuture();
+ Assertions.assertNull(future);
+ FutureContext.getContext().setFuture(CompletableFuture.completedFuture("future from thread2"));
+ });
+ thread2.start();
+
+ Thread.sleep(1000);
+ }
+}
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
index 3fed73f..2e91224 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
@@ -24,10 +24,10 @@ import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.TimeoutException;
import org.apache.dubbo.remoting.exchange.ExchangeClient;
import org.apache.dubbo.rpc.AsyncRpcResult;
+import org.apache.dubbo.rpc.FutureContext;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
-import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.protocol.AbstractInvoker;
@@ -91,13 +91,13 @@ public class DubboInvoker<T> extends AbstractInvoker<T> {
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
- RpcContext.getContext().setFuture(null);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
asyncRpcResult.subscribeTo(responseFuture);
- RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
+ // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
+ FutureContext.getContext().setCompatibleFuture(responseFuture);
return asyncRpcResult;
}
} catch (TimeoutException e) {
diff --git a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java
index 3b0e3d9..c36213b 100644
--- a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java
@@ -23,14 +23,13 @@ import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.TimeoutException;
import org.apache.dubbo.remoting.exchange.ExchangeClient;
import org.apache.dubbo.rpc.AsyncRpcResult;
+import org.apache.dubbo.rpc.FutureContext;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
-import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.protocol.AbstractInvoker;
-import org.apache.dubbo.rpc.protocol.dubbo.FutureAdapter;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -97,7 +96,8 @@ public class ThriftInvoker<T> extends AbstractInvoker<T> {
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
asyncRpcResult.subscribeTo(responseFuture);
- RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
+ // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
+ FutureContext.getContext().setCompatibleFuture(responseFuture);
return asyncRpcResult;
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, e.getMessage(), e);