You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by vi...@apache.org on 2019/07/05 09:31:08 UTC

[dubbo] branch 2.7.3-release updated: FutureAdapter depends on AsyncRpcResult (#4471)

This is an automated email from the ASF dual-hosted git repository.

victory pushed a commit to branch 2.7.3-release
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/2.7.3-release by this push:
     new e3e5460  FutureAdapter depends on AsyncRpcResult (#4471)
e3e5460 is described below

commit e3e5460e4e2d7d26a4e2b66e280c668ecbfbc86d
Author: ken.lj <ke...@gmail.com>
AuthorDate: Fri Jul 5 17:30:53 2019 +0800

    FutureAdapter depends on AsyncRpcResult (#4471)
    
    #4471
---
 .../java/com/alibaba/dubbo/rpc/RpcContext.java     |  2 +-
 .../dubbo/rpc/protocol/dubbo/FutureAdapter.java    | 45 +++++++++++-------
 .../java/org/apache/dubbo/rpc/AsyncRpcResult.java  | 28 +++---------
 .../java/org/apache/dubbo/rpc/FutureContext.java   | 33 ++++++++++++--
 .../main/java/org/apache/dubbo/rpc/RpcContext.java |  7 +--
 .../dubbo/rpc/protocol/dubbo/FutureAdapter.java    |  0
 .../org/apache/dubbo/rpc/FutureContextTest.java    | 53 ++++++++++++++++++++++
 .../dubbo/rpc/protocol/dubbo/DubboInvoker.java     |  6 +--
 .../dubbo/rpc/protocol/thrift/ThriftInvoker.java   |  6 +--
 9 files changed, 126 insertions(+), 54 deletions(-)

diff --git a/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java b/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java
index 010eae8..c21b701 100644
--- a/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java
+++ b/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java
@@ -57,7 +57,7 @@ public class RpcContext extends org.apache.dubbo.rpc.RpcContext {
     }
 
     public <T> Future<T> getFuture() {
-        CompletableFuture completableFuture = FutureContext.getCompletableFuture();
+        CompletableFuture completableFuture = FutureContext.getContext().getCompatibleCompletableFuture();
         if (completableFuture == null) {
             return null;
         }
diff --git a/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/FutureAdapter.java b/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/FutureAdapter.java
index 58a62a4..427fb7c 100644
--- a/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/FutureAdapter.java
+++ b/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/FutureAdapter.java
@@ -18,10 +18,12 @@
 package com.alibaba.dubbo.rpc.protocol.dubbo;
 
 import org.apache.dubbo.rpc.AppResponse;
+import org.apache.dubbo.rpc.Result;
 
 import com.alibaba.dubbo.remoting.RemotingException;
 import com.alibaba.dubbo.remoting.exchange.ResponseCallback;
 import com.alibaba.dubbo.remoting.exchange.ResponseFuture;
+import com.alibaba.dubbo.rpc.RpcException;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -36,11 +38,11 @@ import java.util.function.BiConsumer;
  */
 @Deprecated
 public class FutureAdapter<V> implements Future<V> {
-    private CompletableFuture<V> future;
 
-    public FutureAdapter(CompletableFuture<V> future) {
-        this.future = future;
+    private CompletableFuture<Object> future;
 
+    public FutureAdapter(CompletableFuture<Object> future) {
+        this.future = future;
     }
 
     public ResponseFuture getFuture() {
@@ -48,7 +50,7 @@ public class FutureAdapter<V> implements Future<V> {
             @Override
             public Object get() throws RemotingException {
                 try {
-                    return FutureAdapter.this.get();
+                    return future.get();
                 } catch (InterruptedException e) {
                     throw new RemotingException(e);
                 } catch (ExecutionException e) {
@@ -59,7 +61,7 @@ public class FutureAdapter<V> implements Future<V> {
             @Override
             public Object get(int timeoutInMillis) throws RemotingException {
                 try {
-                    return FutureAdapter.this.get(timeoutInMillis, TimeUnit.MILLISECONDS);
+                    return future.get(timeoutInMillis, TimeUnit.MILLISECONDS);
                 } catch (InterruptedException e) {
                     throw new RemotingException(e);
                 } catch (ExecutionException e) {
@@ -76,26 +78,23 @@ public class FutureAdapter<V> implements Future<V> {
 
             @Override
             public boolean isDone() {
-                return FutureAdapter.this.isDone();
+                return future.isDone();
             }
         };
     }
 
     void setCallback(ResponseCallback callback) {
-        if (!(future instanceof org.apache.dubbo.rpc.protocol.dubbo.FutureAdapter)) {
-            return;
-        }
-        org.apache.dubbo.rpc.protocol.dubbo.FutureAdapter futureAdapter = (org.apache.dubbo.rpc.protocol.dubbo.FutureAdapter) future;
-        BiConsumer<AppResponse, ? super Throwable> biConsumer = new BiConsumer<AppResponse, Throwable>() {
+        BiConsumer<Object, ? super Throwable> biConsumer = new BiConsumer<Object, Throwable>() {
 
             @Override
-            public void accept(AppResponse appResponse, Throwable t) {
+            public void accept(Object obj, Throwable t) {
                 if (t != null) {
                     if (t instanceof CompletionException) {
                         t = t.getCause();
                     }
                     callback.caught(t);
                 } else {
+                    AppResponse appResponse = (AppResponse)obj;
                     if (appResponse.hasException()) {
                         callback.caught(appResponse.getException());
                     } else {
@@ -104,15 +103,15 @@ public class FutureAdapter<V> implements Future<V> {
                 }
             }
         };
-        futureAdapter.getAppResponseFuture().whenComplete(biConsumer);
+        future.whenComplete(biConsumer);
     }
 
     public boolean cancel(boolean mayInterruptIfRunning) {
-        return future.cancel(mayInterruptIfRunning);
+        return false;
     }
 
     public boolean isCancelled() {
-        return future.isCancelled();
+        return false;
     }
 
     public boolean isDone() {
@@ -121,11 +120,23 @@ public class FutureAdapter<V> implements Future<V> {
 
     @SuppressWarnings("unchecked")
     public V get() throws InterruptedException, ExecutionException {
-        return future.get();
+        try {
+            return (V) (((Result) future.get()).recreate());
+        } catch (InterruptedException | ExecutionException e)  {
+            throw e;
+        } catch (Throwable e) {
+            throw new RpcException(e);
+        }
     }
 
     @SuppressWarnings("unchecked")
     public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
-        return future.get(timeout, unit);
+        try {
+            return (V) (((Result) future.get(timeout, unit)).recreate());
+        } catch (InterruptedException | ExecutionException | TimeoutException e)  {
+            throw e;
+        } catch (Throwable e) {
+            throw new RpcException(e);
+        }
     }
 }
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
index 2714409..ef1adc4 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
@@ -18,10 +18,10 @@ package org.apache.dubbo.rpc;
 
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.rpc.protocol.dubbo.FutureAdapter;
 
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.function.BiConsumer;
 
 /**
@@ -139,29 +139,13 @@ public class AsyncRpcResult extends AbstractResult {
     @Override
     public Object recreate() throws Throwable {
         RpcInvocation rpcInvocation = (RpcInvocation) invocation;
+        FutureAdapter future = new FutureAdapter(this);
+        RpcContext.getContext().setFuture(future);
         if (InvokeMode.FUTURE == rpcInvocation.getInvokeMode()) {
-            AppResponse appResponse = new AppResponse();
-            CompletableFuture<Object> future = new CompletableFuture<>();
-            appResponse.setValue(future);
-            this.whenComplete((result, t) -> {
-                if (t != null) {
-                    if (t instanceof CompletionException) {
-                        t = t.getCause();
-                    }
-                    future.completeExceptionally(t);
-                } else {
-                    if (result.hasException()) {
-                        future.completeExceptionally(result.getException());
-                    } else {
-                        future.complete(result.getValue());
-                    }
-                }
-            });
-            return appResponse.recreate();
-        } else if (this.isDone()) {
-            return this.get().recreate();
+            return future;
         }
-        return (new AppResponse()).recreate();
+
+        return getAppResponse().recreate();
     }
 
     @Override
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/FutureContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/FutureContext.java
index 8d5a1b6..2ac608a 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/FutureContext.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/FutureContext.java
@@ -29,7 +29,19 @@ import java.util.concurrent.CompletableFuture;
  */
 public class FutureContext {
 
-    public static InternalThreadLocal<CompletableFuture<?>> futureTL = new InternalThreadLocal<>();
+    private static InternalThreadLocal<FutureContext> futureTL = new InternalThreadLocal<FutureContext>() {
+        @Override
+        protected FutureContext initialValue() {
+            return new FutureContext();
+        }
+    };
+
+    public static FutureContext getContext() {
+        return futureTL.get();
+    }
+
+    private CompletableFuture<?> future;
+    private CompletableFuture<?> compatibleFuture;
 
     /**
      * get future.
@@ -38,8 +50,8 @@ public class FutureContext {
      * @return future
      */
     @SuppressWarnings("unchecked")
-    public static <T> CompletableFuture<T> getCompletableFuture() {
-        return (CompletableFuture<T>) futureTL.get();
+    public <T> CompletableFuture<T> getCompletableFuture() {
+        return (CompletableFuture<T>) future;
     }
 
     /**
@@ -47,8 +59,19 @@ public class FutureContext {
      *
      * @param future
      */
-    public static void setFuture(CompletableFuture<?> future) {
-        futureTL.set(future);
+    public void setFuture(CompletableFuture<?> future) {
+        this.future = future;
+    }
+
+    @Deprecated
+    @SuppressWarnings("unchecked")
+    public <T> CompletableFuture<T> getCompatibleCompletableFuture() {
+        return (CompletableFuture<T>) compatibleFuture;
+    }
+
+    @Deprecated
+    public void setCompatibleFuture(CompletableFuture<?> compatibleFuture) {
+        this.compatibleFuture = compatibleFuture;
     }
 
 }
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
index 5022170..1066c7d 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
@@ -105,6 +105,7 @@ public class RpcContext {
     private Object response;
     private AsyncContext asyncContext;
 
+
     protected RpcContext() {
     }
 
@@ -224,7 +225,7 @@ public class RpcContext {
      */
     @SuppressWarnings("unchecked")
     public <T> CompletableFuture<T> getCompletableFuture() {
-        return FutureContext.getCompletableFuture();
+        return FutureContext.getContext().getCompletableFuture();
     }
 
     /**
@@ -235,7 +236,7 @@ public class RpcContext {
      */
     @SuppressWarnings("unchecked")
     public <T> Future<T> getFuture() {
-        return FutureContext.getCompletableFuture();
+        return FutureContext.getContext().getCompletableFuture();
     }
 
     /**
@@ -244,7 +245,7 @@ public class RpcContext {
      * @param future
      */
     public void setFuture(CompletableFuture<?> future) {
-        FutureContext.setFuture(future);
+        FutureContext.getContext().setFuture(future);
     }
 
     public List<URL> getUrls() {
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/FutureAdapter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/FutureAdapter.java
similarity index 100%
rename from dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/FutureAdapter.java
rename to dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/FutureAdapter.java
diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/FutureContextTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/FutureContextTest.java
new file mode 100644
index 0000000..1a907dc
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/FutureContextTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ *
+ */
+public class FutureContextTest {
+
+    @Test
+    public void testFutureContext() throws Exception {
+        Thread thread1 = new Thread(() -> {
+            FutureContext.getContext().setFuture(CompletableFuture.completedFuture("future from thread1"));
+            try {
+                Thread.sleep(500);
+                Assertions.assertEquals("future from thread1", FutureContext.getContext().getCompletableFuture().get());
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        });
+        thread1.start();
+
+        Thread.sleep(100);
+
+        Thread thread2 = new Thread(() -> {
+            CompletableFuture future = FutureContext.getContext().getCompletableFuture();
+            Assertions.assertNull(future);
+            FutureContext.getContext().setFuture(CompletableFuture.completedFuture("future from thread2"));
+        });
+        thread2.start();
+
+        Thread.sleep(1000);
+    }
+}
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
index 3fed73f..2e91224 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
@@ -24,10 +24,10 @@ import org.apache.dubbo.remoting.RemotingException;
 import org.apache.dubbo.remoting.TimeoutException;
 import org.apache.dubbo.remoting.exchange.ExchangeClient;
 import org.apache.dubbo.rpc.AsyncRpcResult;
+import org.apache.dubbo.rpc.FutureContext;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.Result;
-import org.apache.dubbo.rpc.RpcContext;
 import org.apache.dubbo.rpc.RpcException;
 import org.apache.dubbo.rpc.RpcInvocation;
 import org.apache.dubbo.rpc.protocol.AbstractInvoker;
@@ -91,13 +91,13 @@ public class DubboInvoker<T> extends AbstractInvoker<T> {
             if (isOneway) {
                 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                 currentClient.send(inv, isSent);
-                RpcContext.getContext().setFuture(null);
                 return AsyncRpcResult.newDefaultAsyncResult(invocation);
             } else {
                 AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
                 CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
                 asyncRpcResult.subscribeTo(responseFuture);
-                RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
+                // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
+                FutureContext.getContext().setCompatibleFuture(responseFuture);
                 return asyncRpcResult;
             }
         } catch (TimeoutException e) {
diff --git a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java
index 3b0e3d9..c36213b 100644
--- a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java
@@ -23,14 +23,13 @@ import org.apache.dubbo.remoting.RemotingException;
 import org.apache.dubbo.remoting.TimeoutException;
 import org.apache.dubbo.remoting.exchange.ExchangeClient;
 import org.apache.dubbo.rpc.AsyncRpcResult;
+import org.apache.dubbo.rpc.FutureContext;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.Result;
-import org.apache.dubbo.rpc.RpcContext;
 import org.apache.dubbo.rpc.RpcException;
 import org.apache.dubbo.rpc.RpcInvocation;
 import org.apache.dubbo.rpc.protocol.AbstractInvoker;
-import org.apache.dubbo.rpc.protocol.dubbo.FutureAdapter;
 
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -97,7 +96,8 @@ public class ThriftInvoker<T> extends AbstractInvoker<T> {
             AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
             CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
             asyncRpcResult.subscribeTo(responseFuture);
-            RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
+            // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
+            FutureContext.getContext().setCompatibleFuture(responseFuture);
             return asyncRpcResult;
         } catch (TimeoutException e) {
             throw new RpcException(RpcException.TIMEOUT_EXCEPTION, e.getMessage(), e);