You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2019/12/17 02:00:16 UTC

[dubbo] branch 2.7.5-release updated: Enhance consumer side thread model: threadless executor (#5490)

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 2.7.5-release
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/2.7.5-release by this push:
     new 5f8ac2b  Enhance consumer side thread model: threadless executor (#5490)
5f8ac2b is described below

commit 5f8ac2b2e16ef0fa85b25280b3388a187cc415a5
Author: ken.lj <ke...@gmail.com>
AuthorDate: Tue Dec 17 10:00:05 2019 +0800

    Enhance consumer side thread model: threadless executor (#5490)
---
 .../common/threadpool/ThreadlessExecutor.java      | 21 +++++++++++--
 .../manager/DefaultExecutorRepository.java         | 13 +++++++++
 .../remoting/exchange/support/DefaultFuture.java   | 34 +++++++++++++++-------
 .../java/org/apache/dubbo/rpc/AsyncRpcResult.java  | 19 +++++++-----
 .../dubbo/rpc/protocol/AsyncToSyncInvoker.java     |  5 ++++
 5 files changed, 71 insertions(+), 21 deletions(-)

diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
index 322d8d9..a0fc084 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
@@ -19,9 +19,11 @@ package org.apache.dubbo.common.threadpool;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.AbstractExecutorService;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -42,6 +44,8 @@ public class ThreadlessExecutor extends AbstractExecutorService {
 
     private ExecutorService sharedExecutor;
 
+    private CompletableFuture<?> waitingFuture;
+
     private volatile boolean waiting = true;
 
     private final Object lock = new Object();
@@ -50,6 +54,14 @@ public class ThreadlessExecutor extends AbstractExecutorService {
         this.sharedExecutor = sharedExecutor;
     }
 
+    public CompletableFuture<?> getWaitingFuture() {
+        return waitingFuture;
+    }
+
+    public void setWaitingFuture(CompletableFuture<?> waitingFuture) {
+        this.waitingFuture = waitingFuture;
+    }
+
     public boolean isWaiting() {
         return waiting;
     }
@@ -113,9 +125,10 @@ public class ThreadlessExecutor extends AbstractExecutorService {
     /**
      * tells the thread blocking on {@link #waitAndDrain()} to return, despite of the current status, to avoid endless waiting.
      */
-    public void notifyReturn() {
+    public void notifyReturn(Throwable t) {
         // an empty runnable task.
         execute(() -> {
+            waitingFuture.completeExceptionally(t);
         });
     }
 
@@ -125,12 +138,14 @@ public class ThreadlessExecutor extends AbstractExecutorService {
 
     @Override
     public void shutdown() {
-
+        shutdownNow();
     }
 
     @Override
     public List<Runnable> shutdownNow() {
-        return null;
+        notifyReturn(new IllegalStateException("Consumer is shutting down and this call is going to be stopped without " +
+                "receiving any result, usually this is called by a slow provider instance or bad service implementation."));
+        return Collections.emptyList();
     }
 
     @Override
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
index f0aab3c..dd37bff 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
@@ -64,6 +64,12 @@ public class DefaultExecutorRepository implements ExecutorRepository {
         serviceExporterExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Dubbo-exporter-scheduler"));
     }
 
+    /**
+     * Get called when the server or client instance initiating.
+     *
+     * @param url
+     * @return
+     */
     public synchronized ExecutorService createExecutorIfAbsent(URL url) {
         String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY;
         if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
@@ -87,7 +93,14 @@ public class DefaultExecutorRepository implements ExecutorRepository {
             componentKey = CONSUMER_SIDE;
         }
         Map<Integer, ExecutorService> executors = data.get(componentKey);
+
+        /**
+         * It's guaranteed that this method is called after {@link #createExecutorIfAbsent(URL)}, so data should already
+         * have Executor instances generated and stored.
+         */
         if (executors == null) {
+            logger.warn("No available executors, this is not expected, framework should call createExecutorIfAbsent first " +
+                    "before coming to here.");
             return null;
         }
 
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
index aeaff45..2fe1eef 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
@@ -107,6 +107,10 @@ public class DefaultFuture extends CompletableFuture<Object> {
     public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
         final DefaultFuture future = new DefaultFuture(channel, request, timeout);
         future.setExecutor(executor);
+        // ThreadlessExecutor needs to hold the waiting future in case of circuit return.
+        if (executor instanceof ThreadlessExecutor) {
+            ((ThreadlessExecutor) executor).setWaitingFuture(future);
+        }
         // timeout check
         timeoutCheck(future);
         return future;
@@ -138,6 +142,11 @@ public class DefaultFuture extends CompletableFuture<Object> {
             if (channel.equals(entry.getValue())) {
                 DefaultFuture future = getFuture(entry.getKey());
                 if (future != null && !future.isDone()) {
+                    ExecutorService futureExecutor = future.getExecutor();
+                    if (futureExecutor != null && !futureExecutor.isTerminated()) {
+                        futureExecutor.shutdownNow();
+                    }
+
                     Response disconnectResponse = new Response(future.getId());
                     disconnectResponse.setStatus(Response.CHANNEL_INACTIVE);
                     disconnectResponse.setErrorMessage("Channel " +
@@ -208,7 +217,8 @@ public class DefaultFuture extends CompletableFuture<Object> {
         if (executor != null && executor instanceof ThreadlessExecutor) {
             ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
             if (threadlessExecutor.isWaiting()) {
-                threadlessExecutor.notifyReturn();
+                threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" +
+                        " which is not an expected state, interrupt the thread manually by returning an exception."));
             }
         }
     }
@@ -271,16 +281,20 @@ public class DefaultFuture extends CompletableFuture<Object> {
                 return;
             }
             if (future.getExecutor() != null) {
-                future.getExecutor().execute(() -> {
-                    // create exception response.
-                    Response timeoutResponse = new Response(future.getId());
-                    // set timeout status.
-                    timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
-                    timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
-                    // handle response.
-                    DefaultFuture.received(future.getChannel(), timeoutResponse, true);
-                });
+                future.getExecutor().execute(() -> notifyTimeout(future));
+            } else {
+                notifyTimeout(future);
             }
         }
+
+        private void notifyTimeout(DefaultFuture future) {
+            // create exception response.
+            Response timeoutResponse = new Response(future.getId());
+            // set timeout status.
+            timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
+            timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
+            // handle response.
+            DefaultFuture.received(future.getChannel(), timeoutResponse, true);
+        }
     }
 }
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
index cb73c03..5a6e169 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
@@ -96,8 +96,9 @@ public class AsyncRpcResult implements Result {
                 responseFuture.complete(appResponse);
             }
         } catch (Exception e) {
-            // This should never happen;
-            logger.error("Got exception when trying to change the value of the underlying result from AsyncRpcResult.", e);
+            // This should not happen in normal request process;
+            logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult.");
+            throw new RpcException(e);
         }
     }
 
@@ -117,8 +118,9 @@ public class AsyncRpcResult implements Result {
                 responseFuture.complete(appResponse);
             }
         } catch (Exception e) {
-            // This should never happen;
-            logger.error("Got exception when trying to change the value of the underlying result from AsyncRpcResult.", e);
+            // This should not happen in normal request process;
+            logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult.");
+            throw new RpcException(e);
         }
     }
 
@@ -141,8 +143,9 @@ public class AsyncRpcResult implements Result {
                 return responseFuture.get();
             }
         } catch (Exception e) {
-            // This should never happen;
-            logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult.", e);
+            // This should not happen in normal request process;
+            logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult.");
+            throw new RpcException(e);
         }
         return new AppResponse();
     }
@@ -158,7 +161,7 @@ public class AsyncRpcResult implements Result {
      */
     @Override
     public Result get() throws InterruptedException, ExecutionException {
-        if (executor != null) {
+        if (executor != null && executor instanceof ThreadlessExecutor) {
             ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
             threadlessExecutor.waitAndDrain();
         }
@@ -167,7 +170,7 @@ public class AsyncRpcResult implements Result {
 
     @Override
     public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
-        if (executor != null) {
+        if (executor != null && executor instanceof ThreadlessExecutor) {
             ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
             threadlessExecutor.waitAndDrain();
         }
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AsyncToSyncInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AsyncToSyncInvoker.java
index 0b28532..bc381d2 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AsyncToSyncInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AsyncToSyncInvoker.java
@@ -53,6 +53,11 @@ public class AsyncToSyncInvoker<T> implements Invoker<T> {
 
         try {
             if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
+                /**
+                 * NOTICE!
+                 * must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because
+                 * {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop.
+                 */
                 asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
             }
         } catch (InterruptedException e) {