You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/06/03 09:21:36 UTC
[dubbo] branch 3.0 updated: Refactor AbstractInvoker class (#7952)
This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new ebedc1d Refactor AbstractInvoker class (#7952)
ebedc1d is described below
commit ebedc1d95c0d9352fd75e4a41c8053873f4b3896
Author: Kevin_T <59...@qq.com>
AuthorDate: Thu Jun 3 17:21:22 2021 +0800
Refactor AbstractInvoker class (#7952)
+ comments added
+ code optimization
+ make process clearer
---
.../apache/dubbo/rpc/protocol/AbstractInvoker.java | 137 +++++++++++++++------
1 file changed, 98 insertions(+), 39 deletions(-)
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
index c75f107..70756d2 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
@@ -16,6 +16,7 @@
*/
package org.apache.dubbo.rpc.protocol;
+import org.apache.dubbo.common.Node;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.extension.ExtensionLoader;
@@ -61,16 +62,33 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
protected static final Logger logger = LoggerFactory.getLogger(AbstractInvoker.class);
+ /**
+ * Service interface type
+ */
private final Class<T> type;
+ /**
+ * {@link Node} url
+ */
private final URL url;
+ /**
+ * {@link Invoker} default attachment
+ */
private final Map<String, Object> attachment;
+ /**
+ * {@link Node} available
+ */
private volatile boolean available = true;
+ /**
+ * {@link Node} destroy
+ */
private boolean destroyed = false;
+ // -- Constructor
+
public AbstractInvoker(Class<T> type, URL url) {
this(type, url, (Map<String, Object>) null);
}
@@ -88,7 +106,9 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
}
this.type = type;
this.url = url;
- this.attachment = attachment == null ? null : Collections.unmodifiableMap(attachment);
+ this.attachment = attachment == null
+ ? null
+ : Collections.unmodifiableMap(attachment);
}
private static Map<String, Object> convertAttachment(URL url, String[] keys) {
@@ -105,6 +125,8 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
return attachment;
}
+ // -- Public api
+
@Override
public Class<T> getInterface() {
return type;
@@ -120,16 +142,16 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
return available;
}
- protected void setAvailable(boolean available) {
- this.available = available;
- }
-
@Override
public void destroy() {
this.destroyed = true;
setAvailable(false);
}
+ protected void setAvailable(boolean available) {
+ this.available = available;
+ }
+
public boolean isDestroyed() {
return destroyed;
}
@@ -146,53 +168,82 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
+ ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
}
+
RpcInvocation invocation = (RpcInvocation) inv;
- invocation.setInvoker(this);
+
+ // prepare rpc invocation
+ prepareInvocation(invocation);
+
+ // do invoke rpc invocation and return async result
+ AsyncRpcResult asyncResult = doInvokeAndReturn(invocation);
+
+ // wait rpc result if sync
+ waitForResultIfSync(asyncResult, invocation);
+
+ return asyncResult;
+ }
+
+ private void prepareInvocation(RpcInvocation inv) {
+ inv.setInvoker(this);
+
+ addInvocationAttachments(inv);
+
+ inv.setInvokeMode(RpcUtils.getInvokeMode(url, inv));
+
+ RpcUtils.attachInvocationIdIfAsync(getUrl(), inv);
+
+ Byte serializationId = CodecSupport.getIDByName(getUrl().getParameter(SERIALIZATION_KEY, DEFAULT_REMOTING_SERIALIZATION));
+ if (serializationId != null) {
+ inv.put(SERIALIZATION_ID_KEY, serializationId);
+ }
+ }
+
+ private void addInvocationAttachments(RpcInvocation invocation) {
+ // invoker attachment
if (CollectionUtils.isNotEmptyMap(attachment)) {
invocation.addObjectAttachmentsIfAbsent(attachment);
}
- Map<String, Object> contextAttachments = RpcContext.getClientAttachment().getObjectAttachments();
- if (contextAttachments != null && contextAttachments.size() != 0) {
- invocation.addObjectAttachmentsIfAbsent(contextAttachments);
+ // client context attachment
+ Map<String, Object> clientContextAttachments = RpcContext.getClientAttachment().getObjectAttachments();
+ if (CollectionUtils.isNotEmptyMap(clientContextAttachments)) {
+ invocation.addObjectAttachmentsIfAbsent(clientContextAttachments);
}
+ // server context attachment
ExtensionLoader<PenetrateAttachmentSelector> selectorExtensionLoader = ExtensionLoader.getExtensionLoader(PenetrateAttachmentSelector.class);
Set<String> supportedSelectors = selectorExtensionLoader.getSupportedExtensions();
if (CollectionUtils.isNotEmpty(supportedSelectors)) {
+ // custom context attachment
for (String supportedSelector : supportedSelectors) {
Map<String, Object> selected = selectorExtensionLoader.getExtension(supportedSelector).select();
if (CollectionUtils.isNotEmptyMap(selected)) {
- ((RpcInvocation) invocation).addObjectAttachmentsIfAbsent(selected);
+ invocation.addObjectAttachmentsIfAbsent(selected);
}
}
} else {
- ((RpcInvocation) invocation).addObjectAttachmentsIfAbsent(RpcContext.getServerAttachment().getObjectAttachments());
+ Map<String, Object> serverContextAttachments = RpcContext.getServerAttachment().getObjectAttachments();
+ invocation.addObjectAttachmentsIfAbsent(serverContextAttachments);
}
+ }
- invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
- RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
-
- Byte serializationId = CodecSupport.getIDByName(getUrl().getParameter(SERIALIZATION_KEY, DEFAULT_REMOTING_SERIALIZATION));
- if (serializationId != null) {
- invocation.put(SERIALIZATION_ID_KEY, serializationId);
- }
-
-
+ private AsyncRpcResult doInvokeAndReturn(RpcInvocation invocation) {
AsyncRpcResult asyncResult;
try {
asyncResult = (AsyncRpcResult) doInvoke(invocation);
- } catch (InvocationTargetException e) { // biz exception
+ } catch (InvocationTargetException e) {
Throwable te = e.getTargetException();
- if (te == null) {
- asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
- } else {
+ if (te != null) {
+ // if biz exception
if (te instanceof RpcException) {
((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
}
asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
+ } else {
+ asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
}
} catch (RpcException e) {
+ // if biz exception
if (e.isBiz()) {
asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
} else {
@@ -201,31 +252,33 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
} catch (Throwable e) {
asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
}
+
+ // set server context
RpcContext.getServiceContext().setFuture(new FutureAdapter<>(asyncResult.getResponseFuture()));
- waitForResultIfSync(asyncResult, invocation);
return asyncResult;
}
private void waitForResultIfSync(AsyncRpcResult asyncResult, RpcInvocation invocation) {
+ if (InvokeMode.SYNC != invocation.getInvokeMode()) {
+ return;
+ }
try {
- if (InvokeMode.SYNC == invocation.getInvokeMode()) {
- /**
- * NOTICE!
- * must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because
- * {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop.
- */
- asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
- }
+ /*
+ * NOTICE!
+ * must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because
+ * {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop.
+ */
+ asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
- throw new RpcException("Interrupted unexpectedly while waiting for remote result to return! method: " +
+ throw new RpcException("Interrupted unexpectedly while waiting for remote result to return! method: " +
invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (ExecutionException e) {
- Throwable t = e.getCause();
- if (t instanceof TimeoutException) {
+ Throwable rootCause = e.getCause();
+ if (rootCause instanceof TimeoutException) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " +
invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
- } else if (t instanceof RemotingException) {
+ } else if (rootCause instanceof RemotingException) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " +
invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} else {
@@ -237,8 +290,12 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
}
}
+ // -- Protected api
+
protected ExecutorService getCallbackExecutor(URL url, Invocation inv) {
- ExecutorService sharedExecutor = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getExecutor(url);
+ ExecutorService sharedExecutor = ExtensionLoader.getExtensionLoader(ExecutorRepository.class)
+ .getDefaultExtension()
+ .getExecutor(url);
if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) {
return new ThreadlessExecutor(sharedExecutor);
} else {
@@ -246,6 +303,8 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
}
}
+ /**
+ * Specific implementation of the {@link #invoke(Invocation)} method
+ */
protected abstract Result doInvoke(Invocation invocation) throws Throwable;
-
}