You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2020/04/27 06:24:26 UTC

[dubbo] branch master updated: bugfix:fix alibaba RpcContext setAttachment(String, String) (#6052)

This is an automated email from the ASF dual-hosted git repository.

liujieqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/master by this push:
     new 85ad18f  bugfix:fix alibaba RpcContext setAttachment(String,String) (#6052)
85ad18f is described below

commit 85ad18fe22c447568f1c9bfb1c58d645c3dd7b9a
Author: ken.lj <ke...@gmail.com>
AuthorDate: Mon Apr 27 14:24:08 2020 +0800

    bugfix:fix alibaba RpcContext setAttachment(String,String) (#6052)
---
 .../java/com/alibaba/dubbo/rpc/RpcContext.java     | 371 +++++++++++++++++++--
 1 file changed, 348 insertions(+), 23 deletions(-)

diff --git a/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java b/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java
index db3c7cb..08cc647 100644
--- a/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java
+++ b/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java
@@ -17,45 +17,371 @@
 
 package com.alibaba.dubbo.rpc;
 
+import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.rpc.FutureContext;
 
+import com.alibaba.dubbo.common.Constants;
+import com.alibaba.dubbo.common.URL;
 import com.alibaba.dubbo.rpc.protocol.dubbo.FutureAdapter;
 
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 @Deprecated
-public class RpcContext extends org.apache.dubbo.rpc.RpcContext {
+public class RpcContext {
 
     public static RpcContext getContext() {
-        return newInstance(org.apache.dubbo.rpc.RpcContext.getContext());
+        return new RpcContext(org.apache.dubbo.rpc.RpcContext.getContext());
     }
 
-    private static RpcContext newInstance(org.apache.dubbo.rpc.RpcContext rpcContext) {
-        RpcContext copy = new RpcContext();
-        copy.getAttachments().putAll(rpcContext.getAttachments());
-        copy.get().putAll(rpcContext.get());
+    public static RpcContext getServerContext() {
+        return new RpcContext(org.apache.dubbo.rpc.RpcContext.getServerContext());
+    }
+
+    public static void removeServerContext() {
+        org.apache.dubbo.rpc.RpcContext.removeServerContext();
+    }
+
+    public static void removeContext() {
+        org.apache.dubbo.rpc.RpcContext.removeContext();
+    }
+
+    private org.apache.dubbo.rpc.RpcContext newRpcContext;
+
+    public RpcContext(org.apache.dubbo.rpc.RpcContext newRpcContext) {
+        this.newRpcContext = newRpcContext;
+    }
+
+    public Object getRequest() {
+        return newRpcContext.getRequest();
+    }
+
+    public <T> T getRequest(Class<T> clazz) {
+        return newRpcContext.getRequest(clazz);
+    }
+
+
+    public void setRequest(Object request) {
+        newRpcContext.setRequest(request);
+    }
+
+    /**
+     * Get the response object of the underlying RPC protocol, e.g. HttpServletResponse
+     *
+     * @return null if the underlying protocol doesn't provide support for getting response
+     */
+    public Object getResponse() {
+        return newRpcContext.getResponse();
+    }
+
+    /**
+     * Get the response object of the underlying RPC protocol, e.g. HttpServletResponse
+     *
+     * @return null if the underlying protocol doesn't provide support for getting response or the response is not of the specified type
+     */
+    @SuppressWarnings("unchecked")
+    public <T> T getResponse(Class<T> clazz) {
+        return newRpcContext.getResponse(clazz);
+    }
+
+    public void setResponse(Object response) {
+        newRpcContext.setResponse(response);
+    }
+
+    /**
+     * is provider side.
+     *
+     * @return provider side.
+     */
+    public boolean isProviderSide() {
+        return newRpcContext.isProviderSide();
+    }
+
+    /**
+     * is consumer side.
+     *
+     * @return consumer side.
+     */
+    public boolean isConsumerSide() {
+        return newRpcContext.isConsumerSide();
+    }
+
+    public List<URL> getUrls() {
+        List<org.apache.dubbo.common.URL> newUrls = newRpcContext.getUrls();
+        if (CollectionUtils.isNotEmpty(newUrls)) {
+            List<URL> urls = new ArrayList<>(newUrls.size());
+            for (org.apache.dubbo.common.URL newUrl : newUrls) {
+                urls.add(new URL(newUrl));
+            }
+            return urls;
+        }
+        return Collections.emptyList();
+    }
+
+    public void setUrls(List<URL> urls) {
+        if (CollectionUtils.isNotEmpty(urls)) {
+            List<org.apache.dubbo.common.URL> newUrls = new ArrayList<>(urls.size());
+            for (URL url : urls) {
+                newUrls.add(url.getOriginalURL());
+            }
+            newRpcContext.setUrls(newUrls);
+        }
+    }
+
+    public URL getUrl() {
+        return new URL(newRpcContext.getUrl());
+    }
+
+    public void setUrl(URL url) {
+        newRpcContext.setUrl(url.getOriginalURL());
+    }
+
+    public String getMethodName() {
+        return newRpcContext.getMethodName();
+    }
+
+    public void setMethodName(String methodName) {
+        newRpcContext.setMethodName(methodName);
+    }
+
+    public Class<?>[] getParameterTypes() {
+        return newRpcContext.getParameterTypes();
+    }
+
+    public void setParameterTypes(Class<?>[] parameterTypes) {
+        newRpcContext.setParameterTypes(parameterTypes);
+    }
+
+    public Object[] getArguments() {
+        return newRpcContext.getArguments();
+    }
+
+    public void setArguments(Object[] arguments) {
+        newRpcContext.setArguments(arguments);
+    }
+
+    public RpcContext setLocalAddress(String host, int port) {
+        newRpcContext.setLocalAddress(host, port);
+        return this;
+    }
+
+    /**
+     * get local address.
+     *
+     * @return local address
+     */
+    public InetSocketAddress getLocalAddress() {
+        return newRpcContext.getLocalAddress();
+    }
+
+    public RpcContext setLocalAddress(InetSocketAddress address) {
+        newRpcContext.setLocalAddress(address);
+        return this;
+    }
 
-        copy.setUrls(rpcContext.getUrls());
-        copy.setUrl(rpcContext.getUrl());
-        copy.setMethodName(rpcContext.getMethodName());
-        copy.setParameterTypes(rpcContext.getParameterTypes());
-        copy.setArguments(rpcContext.getArguments());
-        copy.setLocalAddress(rpcContext.getLocalAddress());
-        copy.setRemoteAddress(rpcContext.getRemoteAddress());
-        copy.setRemoteApplicationName(rpcContext.getRemoteApplicationName());
-        copy.setInvokers(rpcContext.getInvokers());
-        copy.setInvoker(rpcContext.getInvoker());
-        copy.setInvocation(rpcContext.getInvocation());
+    public String getLocalAddressString() {
+        return newRpcContext.getLocalAddressString();
+    }
+
+    public String getLocalHostName() {
+        return newRpcContext.getLocalHostName();
+    }
+
+    public RpcContext setRemoteAddress(String host, int port) {
+        newRpcContext.setRemoteAddress(host, port);
+        return this;
+    }
+
+    public InetSocketAddress getRemoteAddress() {
+        return newRpcContext.getRemoteAddress();
+    }
 
-        copy.setRequest(rpcContext.getRequest());
-        copy.setResponse(rpcContext.getResponse());
-        copy.setAsyncContext(rpcContext.getAsyncContext());
+    public RpcContext setRemoteAddress(InetSocketAddress address) {
+        newRpcContext.setRemoteAddress(address);
+        return this;
+    }
+
+    public String getRemoteAddressString() {
+        return newRpcContext.getRemoteAddressString();
+    }
+
+    public String getRemoteHostName() {
+        return newRpcContext.getRemoteHostName();
+    }
+
+    public String getLocalHost() {
+        return newRpcContext.getLocalHost();
+    }
 
-        return copy;
+    public int getLocalPort() {
+        return newRpcContext.getLocalPort();
+    }
+
+    public String getRemoteHost() {
+        return newRpcContext.getRemoteHost();
+    }
+
+    public int getRemotePort() {
+        return newRpcContext.getRemotePort();
+    }
+
+    public String getAttachment(String key) {
+        return newRpcContext.getAttachment(key);
+    }
+
+    public RpcContext setAttachment(String key, String value) {
+        newRpcContext.setAttachment(key, value);
+        return this;
+    }
+
+    public RpcContext removeAttachment(String key) {
+        newRpcContext.removeAttachment(key);
+        return this;
+    }
+
+    public Map<String, String> getAttachments() {
+        return newRpcContext.getAttachments();
+    }
+
+    public RpcContext setAttachments(Map<String, String> attachment) {
+        newRpcContext.setAttachments(attachment);
+        return this;
+    }
+
+    public void clearAttachments() {
+        newRpcContext.clearAttachments();
+    }
+
+    /**
+     * get values.
+     *
+     * @return values
+     */
+    public Map<String, Object> get() {
+        return newRpcContext.get();
+    }
+
+    /**
+     * set value.
+     *
+     * @param key
+     * @param value
+     * @return context
+     */
+    public RpcContext set(String key, Object value) {
+        newRpcContext.set(key, value);
+        return this;
+    }
+
+    public RpcContext remove(String key) {
+        newRpcContext.remove(key);
+        return this;
+    }
+
+    public Object get(String key) {
+        return newRpcContext.get(key);
+    }
+
+    @Deprecated
+    public boolean isServerSide() {
+        return isProviderSide();
+    }
+
+    @Deprecated
+    public boolean isClientSide() {
+        return isConsumerSide();
+    }
+
+    /**
+     * Async invocation. Timeout will be handled even if <code>Future.get()</code> is not called.
+     *
+     * @param callable
+     * @return get the return result from <code>future.get()</code>
+     */
+    @SuppressWarnings("unchecked")
+    public <T> Future<T> asyncCall(Callable<T> callable) {
+        try {
+            try {
+                setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
+                final T o = callable.call();
+                //local invoke will return directly
+                if (o != null) {
+                    FutureTask<T> f = new FutureTask<T>(new Callable<T>() {
+                        @Override
+                        public T call() throws Exception {
+                            return o;
+                        }
+                    });
+                    f.run();
+                    return f;
+                } else {
+
+                }
+            } catch (Exception e) {
+                throw new RpcException(e);
+            } finally {
+                removeAttachment(Constants.ASYNC_KEY);
+            }
+        } catch (final RpcException e) {
+            return new Future<T>() {
+                @Override
+                public boolean cancel(boolean mayInterruptIfRunning) {
+                    return false;
+                }
+
+                @Override
+                public boolean isCancelled() {
+                    return false;
+                }
+
+                @Override
+                public boolean isDone() {
+                    return true;
+                }
+
+                @Override
+                public T get() throws InterruptedException, ExecutionException {
+                    throw new ExecutionException(e.getCause());
+                }
+
+                @Override
+                public T get(long timeout, TimeUnit unit)
+                        throws InterruptedException, ExecutionException,
+                        TimeoutException {
+                    return get();
+                }
+            };
+        }
+        return ((Future<T>) getContext().getFuture());
+    }
+
+    /**
+     * one way async call, send request only, and result is not required
+     *
+     * @param runnable
+     */
+    public void asyncCall(Runnable runnable) {
+        try {
+            setAttachment(Constants.RETURN_KEY, Boolean.FALSE.toString());
+            runnable.run();
+        } catch (Throwable e) {
+            // FIXME should put exception in future?
+            throw new RpcException("oneway call error ." + e.getMessage(), e);
+        } finally {
+            removeAttachment(Constants.RETURN_KEY);
+        }
     }
 
-    @Override
     public <T> Future<T> getFuture() {
         CompletableFuture completableFuture = FutureContext.getContext().getCompatibleCompletableFuture();
         if (completableFuture == null) {
@@ -64,7 +390,6 @@ public class RpcContext extends org.apache.dubbo.rpc.RpcContext {
         return new FutureAdapter(completableFuture);
     }
 
-    @Override
     public void setFuture(CompletableFuture<?> future) {
         FutureContext.getContext().setCompatibleFuture(future);
     }