You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/06/03 09:21:36 UTC

[dubbo] branch 3.0 updated: Refactor AbstractInvoker class (#7952)

This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new ebedc1d  Refactor AbstractInvoker class (#7952)
ebedc1d is described below

commit ebedc1d95c0d9352fd75e4a41c8053873f4b3896
Author: Kevin_T <59...@qq.com>
AuthorDate: Thu Jun 3 17:21:22 2021 +0800

    Refactor AbstractInvoker class (#7952)
    
    + comments added
    + code optimization
    + make process clearer
---
 .../apache/dubbo/rpc/protocol/AbstractInvoker.java | 137 +++++++++++++++------
 1 file changed, 98 insertions(+), 39 deletions(-)

diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
index c75f107..70756d2 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
@@ -16,6 +16,7 @@
  */
 package org.apache.dubbo.rpc.protocol;
 
+import org.apache.dubbo.common.Node;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.Version;
 import org.apache.dubbo.common.extension.ExtensionLoader;
@@ -61,16 +62,33 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
 
     protected static final Logger logger = LoggerFactory.getLogger(AbstractInvoker.class);
 
+    /**
+     * Service interface type
+     */
     private final Class<T> type;
 
+    /**
+     * {@link Node} url
+     */
     private final URL url;
 
+    /**
+     * {@link Invoker} default attachment
+     */
     private final Map<String, Object> attachment;
 
+    /**
+     * {@link Node} available
+     */
     private volatile boolean available = true;
 
+    /**
+     * {@link Node} destroy
+     */
     private boolean destroyed = false;
 
+    // -- Constructor
+
     public AbstractInvoker(Class<T> type, URL url) {
         this(type, url, (Map<String, Object>) null);
     }
@@ -88,7 +106,9 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
         }
         this.type = type;
         this.url = url;
-        this.attachment = attachment == null ? null : Collections.unmodifiableMap(attachment);
+        this.attachment = attachment == null
+                ? null
+                : Collections.unmodifiableMap(attachment);
     }
 
     private static Map<String, Object> convertAttachment(URL url, String[] keys) {
@@ -105,6 +125,8 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
         return attachment;
     }
 
+    // -- Public api
+
     @Override
     public Class<T> getInterface() {
         return type;
@@ -120,16 +142,16 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
         return available;
     }
 
-    protected void setAvailable(boolean available) {
-        this.available = available;
-    }
-
     @Override
     public void destroy() {
         this.destroyed = true;
         setAvailable(false);
     }
 
+    protected void setAvailable(boolean available) {
+        this.available = available;
+    }
+
     public boolean isDestroyed() {
         return destroyed;
     }
@@ -146,53 +168,82 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
             logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
                     + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
         }
+
         RpcInvocation invocation = (RpcInvocation) inv;
-        invocation.setInvoker(this);
+
+        // prepare rpc invocation
+        prepareInvocation(invocation);
+
+        // do invoke rpc invocation and return async result
+        AsyncRpcResult asyncResult = doInvokeAndReturn(invocation);
+
+        // wait rpc result if sync
+        waitForResultIfSync(asyncResult, invocation);
+
+        return asyncResult;
+    }
+
+    private void prepareInvocation(RpcInvocation inv) {
+        inv.setInvoker(this);
+
+        addInvocationAttachments(inv);
+
+        inv.setInvokeMode(RpcUtils.getInvokeMode(url, inv));
+
+        RpcUtils.attachInvocationIdIfAsync(getUrl(), inv);
+
+        Byte serializationId = CodecSupport.getIDByName(getUrl().getParameter(SERIALIZATION_KEY, DEFAULT_REMOTING_SERIALIZATION));
+        if (serializationId != null) {
+            inv.put(SERIALIZATION_ID_KEY, serializationId);
+        }
+    }
+
+    private void addInvocationAttachments(RpcInvocation invocation) {
+        // invoker attachment
         if (CollectionUtils.isNotEmptyMap(attachment)) {
             invocation.addObjectAttachmentsIfAbsent(attachment);
         }
 
-        Map<String, Object> contextAttachments = RpcContext.getClientAttachment().getObjectAttachments();
-        if (contextAttachments != null && contextAttachments.size() != 0) {
-            invocation.addObjectAttachmentsIfAbsent(contextAttachments);
+        // client context attachment
+        Map<String, Object> clientContextAttachments = RpcContext.getClientAttachment().getObjectAttachments();
+        if (CollectionUtils.isNotEmptyMap(clientContextAttachments)) {
+            invocation.addObjectAttachmentsIfAbsent(clientContextAttachments);
         }
 
+        // server context attachment
         ExtensionLoader<PenetrateAttachmentSelector> selectorExtensionLoader = ExtensionLoader.getExtensionLoader(PenetrateAttachmentSelector.class);
         Set<String> supportedSelectors = selectorExtensionLoader.getSupportedExtensions();
         if (CollectionUtils.isNotEmpty(supportedSelectors)) {
+            // custom context attachment
             for (String supportedSelector : supportedSelectors) {
                 Map<String, Object> selected = selectorExtensionLoader.getExtension(supportedSelector).select();
                 if (CollectionUtils.isNotEmptyMap(selected)) {
-                    ((RpcInvocation) invocation).addObjectAttachmentsIfAbsent(selected);
+                    invocation.addObjectAttachmentsIfAbsent(selected);
                 }
             }
         } else {
-            ((RpcInvocation) invocation).addObjectAttachmentsIfAbsent(RpcContext.getServerAttachment().getObjectAttachments());
+            Map<String, Object> serverContextAttachments = RpcContext.getServerAttachment().getObjectAttachments();
+            invocation.addObjectAttachmentsIfAbsent(serverContextAttachments);
         }
+    }
 
-        invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
-        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
-
-        Byte serializationId = CodecSupport.getIDByName(getUrl().getParameter(SERIALIZATION_KEY, DEFAULT_REMOTING_SERIALIZATION));
-        if (serializationId != null) {
-            invocation.put(SERIALIZATION_ID_KEY, serializationId);
-        }
-
-
+    private AsyncRpcResult doInvokeAndReturn(RpcInvocation invocation) {
         AsyncRpcResult asyncResult;
         try {
             asyncResult = (AsyncRpcResult) doInvoke(invocation);
-        } catch (InvocationTargetException e) { // biz exception
+        } catch (InvocationTargetException e) {
             Throwable te = e.getTargetException();
-            if (te == null) {
-                asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
-            } else {
+            if (te != null) {
+                // if biz exception
                 if (te instanceof RpcException) {
                     ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
                 }
                 asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
+            } else {
+                asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
             }
         } catch (RpcException e) {
+            // if biz exception
             if (e.isBiz()) {
                 asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
             } else {
@@ -201,31 +252,33 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
         } catch (Throwable e) {
             asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
         }
+
+        // set server context
         RpcContext.getServiceContext().setFuture(new FutureAdapter<>(asyncResult.getResponseFuture()));
 
-        waitForResultIfSync(asyncResult, invocation);
         return asyncResult;
     }
 
     private void waitForResultIfSync(AsyncRpcResult asyncResult, RpcInvocation invocation) {
+        if (InvokeMode.SYNC != invocation.getInvokeMode()) {
+            return;
+        }
         try {
-            if (InvokeMode.SYNC == invocation.getInvokeMode()) {
-                /**
-                 * NOTICE!
-                 * must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because
-                 * {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop.
-                 */
-                asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
-            }
+            /*
+             * NOTICE!
+             * must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because
+             * {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop.
+             */
+            asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
-            throw new RpcException("Interrupted unexpectedly while waiting for remote result to return!  method: " +
+            throw new RpcException("Interrupted unexpectedly while waiting for remote result to return! method: " +
                     invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
         } catch (ExecutionException e) {
-            Throwable t = e.getCause();
-            if (t instanceof TimeoutException) {
+            Throwable rootCause = e.getCause();
+            if (rootCause instanceof TimeoutException) {
                 throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " +
                         invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
-            } else if (t instanceof RemotingException) {
+            } else if (rootCause instanceof RemotingException) {
                 throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " +
                         invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
             } else {
@@ -237,8 +290,12 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
         }
     }
 
+    // -- Protected api
+
     protected ExecutorService getCallbackExecutor(URL url, Invocation inv) {
-        ExecutorService sharedExecutor = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getExecutor(url);
+        ExecutorService sharedExecutor = ExtensionLoader.getExtensionLoader(ExecutorRepository.class)
+                .getDefaultExtension()
+                .getExecutor(url);
         if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) {
             return new ThreadlessExecutor(sharedExecutor);
         } else {
@@ -246,6 +303,8 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
         }
     }
 
+    /**
+     * Specific implementation of the {@link #invoke(Invocation)} method
+     */
     protected abstract Result doInvoke(Invocation invocation) throws Throwable;
-
 }