You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2019/12/17 02:00:16 UTC
[dubbo] branch 2.7.5-release updated: Enhance consumer side thread
model: threadless executor (#5490)
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 2.7.5-release
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/2.7.5-release by this push:
new 5f8ac2b Enhance consumer side thread model: threadless executor (#5490)
5f8ac2b is described below
commit 5f8ac2b2e16ef0fa85b25280b3388a187cc415a5
Author: ken.lj <ke...@gmail.com>
AuthorDate: Tue Dec 17 10:00:05 2019 +0800
Enhance consumer side thread model: threadless executor (#5490)
---
.../common/threadpool/ThreadlessExecutor.java | 21 +++++++++++--
.../manager/DefaultExecutorRepository.java | 13 +++++++++
.../remoting/exchange/support/DefaultFuture.java | 34 +++++++++++++++-------
.../java/org/apache/dubbo/rpc/AsyncRpcResult.java | 19 +++++++-----
.../dubbo/rpc/protocol/AsyncToSyncInvoker.java | 5 ++++
5 files changed, 71 insertions(+), 21 deletions(-)
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
index 322d8d9..a0fc084 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
@@ -19,9 +19,11 @@ package org.apache.dubbo.common.threadpool;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -42,6 +44,8 @@ public class ThreadlessExecutor extends AbstractExecutorService {
private ExecutorService sharedExecutor;
+ private CompletableFuture<?> waitingFuture;
+
private volatile boolean waiting = true;
private final Object lock = new Object();
@@ -50,6 +54,14 @@ public class ThreadlessExecutor extends AbstractExecutorService {
this.sharedExecutor = sharedExecutor;
}
+ public CompletableFuture<?> getWaitingFuture() {
+ return waitingFuture;
+ }
+
+ public void setWaitingFuture(CompletableFuture<?> waitingFuture) {
+ this.waitingFuture = waitingFuture;
+ }
+
public boolean isWaiting() {
return waiting;
}
@@ -113,9 +125,10 @@ public class ThreadlessExecutor extends AbstractExecutorService {
/**
* tells the thread blocking on {@link #waitAndDrain()} to return, despite of the current status, to avoid endless waiting.
*/
- public void notifyReturn() {
+ public void notifyReturn(Throwable t) {
// an empty runnable task.
execute(() -> {
+ waitingFuture.completeExceptionally(t);
});
}
@@ -125,12 +138,14 @@ public class ThreadlessExecutor extends AbstractExecutorService {
@Override
public void shutdown() {
-
+ shutdownNow();
}
@Override
public List<Runnable> shutdownNow() {
- return null;
+ notifyReturn(new IllegalStateException("Consumer is shutting down and this call is going to be stopped without " +
+ "receiving any result, usually this is called by a slow provider instance or bad service implementation."));
+ return Collections.emptyList();
}
@Override
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
index f0aab3c..dd37bff 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
@@ -64,6 +64,12 @@ public class DefaultExecutorRepository implements ExecutorRepository {
serviceExporterExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Dubbo-exporter-scheduler"));
}
+ /**
+ * Get called when the server or client instance initiating.
+ *
+ * @param url
+ * @return
+ */
public synchronized ExecutorService createExecutorIfAbsent(URL url) {
String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY;
if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
@@ -87,7 +93,14 @@ public class DefaultExecutorRepository implements ExecutorRepository {
componentKey = CONSUMER_SIDE;
}
Map<Integer, ExecutorService> executors = data.get(componentKey);
+
+ /**
+ * It's guaranteed that this method is called after {@link #createExecutorIfAbsent(URL)}, so data should already
+ * have Executor instances generated and stored.
+ */
if (executors == null) {
+ logger.warn("No available executors, this is not expected, framework should call createExecutorIfAbsent first " +
+ "before coming to here.");
return null;
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
index aeaff45..2fe1eef 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
@@ -107,6 +107,10 @@ public class DefaultFuture extends CompletableFuture<Object> {
public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
final DefaultFuture future = new DefaultFuture(channel, request, timeout);
future.setExecutor(executor);
+ // ThreadlessExecutor needs to hold the waiting future in case of circuit return.
+ if (executor instanceof ThreadlessExecutor) {
+ ((ThreadlessExecutor) executor).setWaitingFuture(future);
+ }
// timeout check
timeoutCheck(future);
return future;
@@ -138,6 +142,11 @@ public class DefaultFuture extends CompletableFuture<Object> {
if (channel.equals(entry.getValue())) {
DefaultFuture future = getFuture(entry.getKey());
if (future != null && !future.isDone()) {
+ ExecutorService futureExecutor = future.getExecutor();
+ if (futureExecutor != null && !futureExecutor.isTerminated()) {
+ futureExecutor.shutdownNow();
+ }
+
Response disconnectResponse = new Response(future.getId());
disconnectResponse.setStatus(Response.CHANNEL_INACTIVE);
disconnectResponse.setErrorMessage("Channel " +
@@ -208,7 +217,8 @@ public class DefaultFuture extends CompletableFuture<Object> {
if (executor != null && executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
if (threadlessExecutor.isWaiting()) {
- threadlessExecutor.notifyReturn();
+ threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" +
+ " which is not an expected state, interrupt the thread manually by returning an exception."));
}
}
}
@@ -271,16 +281,20 @@ public class DefaultFuture extends CompletableFuture<Object> {
return;
}
if (future.getExecutor() != null) {
- future.getExecutor().execute(() -> {
- // create exception response.
- Response timeoutResponse = new Response(future.getId());
- // set timeout status.
- timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
- timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
- // handle response.
- DefaultFuture.received(future.getChannel(), timeoutResponse, true);
- });
+ future.getExecutor().execute(() -> notifyTimeout(future));
+ } else {
+ notifyTimeout(future);
}
}
+
+ private void notifyTimeout(DefaultFuture future) {
+ // create exception response.
+ Response timeoutResponse = new Response(future.getId());
+ // set timeout status.
+ timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
+ timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
+ // handle response.
+ DefaultFuture.received(future.getChannel(), timeoutResponse, true);
+ }
}
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
index cb73c03..5a6e169 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
@@ -96,8 +96,9 @@ public class AsyncRpcResult implements Result {
responseFuture.complete(appResponse);
}
} catch (Exception e) {
- // This should never happen;
- logger.error("Got exception when trying to change the value of the underlying result from AsyncRpcResult.", e);
+ // This should not happen in normal request process;
+ logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult.");
+ throw new RpcException(e);
}
}
@@ -117,8 +118,9 @@ public class AsyncRpcResult implements Result {
responseFuture.complete(appResponse);
}
} catch (Exception e) {
- // This should never happen;
- logger.error("Got exception when trying to change the value of the underlying result from AsyncRpcResult.", e);
+ // This should not happen in normal request process;
+ logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult.");
+ throw new RpcException(e);
}
}
@@ -141,8 +143,9 @@ public class AsyncRpcResult implements Result {
return responseFuture.get();
}
} catch (Exception e) {
- // This should never happen;
- logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult.", e);
+ // This should not happen in normal request process;
+ logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult.");
+ throw new RpcException(e);
}
return new AppResponse();
}
@@ -158,7 +161,7 @@ public class AsyncRpcResult implements Result {
*/
@Override
public Result get() throws InterruptedException, ExecutionException {
- if (executor != null) {
+ if (executor != null && executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
threadlessExecutor.waitAndDrain();
}
@@ -167,7 +170,7 @@ public class AsyncRpcResult implements Result {
@Override
public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- if (executor != null) {
+ if (executor != null && executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
threadlessExecutor.waitAndDrain();
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AsyncToSyncInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AsyncToSyncInvoker.java
index 0b28532..bc381d2 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AsyncToSyncInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AsyncToSyncInvoker.java
@@ -53,6 +53,11 @@ public class AsyncToSyncInvoker<T> implements Invoker<T> {
try {
if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
+ /**
+ * NOTICE!
+ * must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because
+ * {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop.
+ */
asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {