You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2019/06/03 08:07:40 UTC

[dubbo] branch performance-tuning-2.7.x updated: Reduce context switching cost by optimizing thread model on consumer side. (#4131)

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch performance-tuning-2.7.x
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/performance-tuning-2.7.x by this push:
     new 5cc3821  Reduce context switching cost by optimizing thread model on consumer side. (#4131)
5cc3821 is described below

commit 5cc3821438e4f9bcf3a8503ba08e9a0008ef0f4c
Author: ken.lj <ke...@gmail.com>
AuthorDate: Mon Jun 3 16:07:27 2019 +0800

    Reduce context switching cost by optimizing thread model on consumer side. (#4131)
---
 .../dubbo/common/constants/CommonConstants.java    |   3 +
 .../common/threadpool/ThreadlessExecutor.java      | 150 +++++++++++++++++++++
 .../manager/DefaultExecutorRepository.java         | 123 +++++++++++++++++
 .../threadpool/manager/ExecutorRepository.java     |  65 +++++++++
 .../dubbo/common/threadpool/manager/Ring.java      |  66 +++++++++
 ...bo.common.threadpool.manager.ExecutorRepository |   1 +
 .../apache/dubbo/demo/provider/Application.java    |   3 -
 .../dubbo/demo/provider/DemoServiceImpl.java       |   5 +
 .../apache/dubbo/monitor/dubbo/MetricsFilter.java  |   2 +-
 .../apache/dubbo/registry/dubbo/MockChannel.java   |  11 ++
 .../apache/dubbo/registry/dubbo/MockedClient.java  |  15 ++-
 .../java/org/apache/dubbo/remoting/Constants.java  |   4 -
 .../dubbo/remoting/exchange/ExchangeChannel.java   |  22 +++
 .../remoting/exchange/support/DefaultFuture.java   |  44 ++++--
 .../support/header/HeaderExchangeChannel.java      |  15 ++-
 .../support/header/HeaderExchangeClient.java       |  11 ++
 .../dubbo/remoting/transport/AbstractClient.java   |  12 +-
 .../dubbo/remoting/transport/AbstractServer.java   |  39 +-----
 .../dispatcher/WrappedChannelHandler.java          |  88 +++++++-----
 .../dispatcher/all/AllChannelHandler.java          |  16 +--
 .../ConnectionOrderedChannelHandler.java           |  15 +--
 .../DirectChannelHandler.java}                     |  21 +--
 .../dispatcher/direct/DirectDispatcher.java        |   2 +-
 .../execution/ExecutionChannelHandler.java         |  18 +--
 .../message/MessageOnlyChannelHandler.java         |   2 +-
 .../exchange/support/DefaultFutureTest.java        |   4 +-
 .../support/header/HeaderExchangeChannelTest.java  |  12 +-
 .../java/org/apache/dubbo/rpc/AsyncRpcResult.java  |  63 +++++----
 .../java/org/apache/dubbo/rpc}/FutureAdapter.java  |  15 ++-
 .../apache/dubbo/rpc/protocol/AbstractInvoker.java |  28 +++-
 .../rpc/protocol/dubbo/DecodeableRpcResult.java    |   1 +
 .../dubbo/rpc/protocol/dubbo/DubboInvoker.java     |   6 +-
 .../protocol/dubbo/LazyConnectExchangeClient.java  |  15 +++
 .../dubbo/ReferenceCountExchangeClient.java        |  11 ++
 .../dubbo/status/ThreadPoolStatusChecker.java      |   4 +-
 .../dubbo/rpc/protocol/thrift/ThriftInvoker.java   |   3 -
 36 files changed, 723 insertions(+), 192 deletions(-)

diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
index d005ae9..90811e8 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
@@ -17,6 +17,7 @@
 
 package org.apache.dubbo.common.constants;
 
+import java.util.concurrent.ExecutorService;
 import java.util.regex.Pattern;
 
 public interface CommonConstants {
@@ -70,6 +71,8 @@ public interface CommonConstants {
 
     int DEFAULT_THREADS = 200;
 
+    String EXECUTOR_SERVICE_COMPONENT_KEY = ExecutorService.class.getName();
+
     String THREADPOOL_KEY = "threadpool";
 
     String THREAD_NAME_KEY = "threadname";
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
new file mode 100644
index 0000000..322d8d9
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common.threadpool;
+
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * The most important difference between this Executor and other normal Executor is that this one doesn't manage
+ * any thread.
+ *
+ * Tasks submitted to this executor through {@link #execute(Runnable)} will not get scheduled to a specific thread, though normal executors always do the schedule.
+ * Those tasks are stored in a blocking queue and will only be executed when a thead calls {@link #waitAndDrain()}, the thead executing the task
+ * is exactly the same as the one calling waitAndDrain.
+ */
+public class ThreadlessExecutor extends AbstractExecutorService {
+    private static final Logger logger = LoggerFactory.getLogger(ThreadlessExecutor.class.getName());
+
+    private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
+
+    private ExecutorService sharedExecutor;
+
+    private volatile boolean waiting = true;
+
+    private final Object lock = new Object();
+
+    public ThreadlessExecutor(ExecutorService sharedExecutor) {
+        this.sharedExecutor = sharedExecutor;
+    }
+
+    public boolean isWaiting() {
+        return waiting;
+    }
+
+    /**
+     * Waits until there is a Runnable, then executes it and all queued Runnables after it.
+     */
+    public void waitAndDrain() throws InterruptedException {
+        Runnable runnable = queue.take();
+
+        synchronized (lock) {
+            waiting = false;
+            runnable.run();
+        }
+
+        runnable = queue.poll();
+        while (runnable != null) {
+            try {
+                runnable.run();
+            } catch (Throwable t) {
+                logger.info(t);
+
+            }
+            runnable = queue.poll();
+        }
+    }
+
+    public long waitAndDrain(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
+        /*long startInMs = System.currentTimeMillis();
+        Runnable runnable = queue.poll(timeout, unit);
+        if (runnable == null) {
+            throw new TimeoutException();
+        }
+        runnable.run();
+        long elapsedInMs = System.currentTimeMillis() - startInMs;
+        long timeLeft = timeout - elapsedInMs;
+        if (timeLeft < 0) {
+            throw new TimeoutException();
+        }
+        return timeLeft;*/
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * If the calling thread is still waiting for a callback task, add the task into the blocking queue to wait for schedule.
+     * Otherwise, submit to shared callback executor directly.
+     *
+     * @param runnable
+     */
+    @Override
+    public void execute(Runnable runnable) {
+        synchronized (lock) {
+            if (!waiting) {
+                sharedExecutor.execute(runnable);
+            } else {
+                queue.add(runnable);
+            }
+        }
+    }
+
+    /**
+     * tells the thread blocking on {@link #waitAndDrain()} to return, despite of the current status, to avoid endless waiting.
+     */
+    public void notifyReturn() {
+        // an empty runnable task.
+        execute(() -> {
+        });
+    }
+
+    /**
+     * The following methods are still not supported
+     */
+
+    @Override
+    public void shutdown() {
+
+    }
+
+    @Override
+    public List<Runnable> shutdownNow() {
+        return null;
+    }
+
+    @Override
+    public boolean isShutdown() {
+        return false;
+    }
+
+    @Override
+    public boolean isTerminated() {
+        return false;
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+        return false;
+    }
+}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
new file mode 100644
index 0000000..1499056
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common.threadpool.manager;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.threadpool.ThreadPool;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SIDE;
+import static org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_SERVICE_COMPONENT_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.THREADS_KEY;
+
+/**
+ * Consider implementing {@code Licycle} to enable executors shutdown when the process stops.
+ */
+public class DefaultExecutorRepository implements ExecutorRepository {
+    private static final Logger logger = LoggerFactory.getLogger(DefaultExecutorRepository.class);
+
+    private int DEFAULT_SCHEDULER_SIZE = Runtime.getRuntime().availableProcessors();
+
+    private final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
+
+    private Ring<ScheduledExecutorService> scheduledExecutors = new Ring<>();
+
+    private ScheduledExecutorService reconnectScheduledExecutor;
+
+    private ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>> data = new ConcurrentHashMap<>();
+
+    public DefaultExecutorRepository() {
+//        for (int i = 0; i < DEFAULT_SCHEDULER_SIZE; i++) {
+//            ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-framework-scheduler"));
+//            scheduledExecutors.addItem(scheduler);
+//        }
+//
+//        reconnectScheduledExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-reconnect-scheduler"));
+    }
+
+    public ExecutorService createExecutorIfAbsent(URL url) {
+        String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY;
+        if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
+            componentKey = CONSUMER_SIDE;
+        }
+        Map<Integer, ExecutorService> executors = data.computeIfAbsent(componentKey, k -> new ConcurrentHashMap<>());
+        return executors.computeIfAbsent(url.getPort(), k -> (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url));
+    }
+
+    public ExecutorService getExecutor(URL url) {
+        String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY;
+        if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
+            componentKey = CONSUMER_SIDE;
+        }
+        Map<Integer, ExecutorService> executors = data.get(componentKey);
+        if (executors == null) {
+            return null;
+        }
+        return executors.get(url.getPort());
+    }
+
+    @Override
+    public void updateThreadpool(URL url, ExecutorService executor) {
+        try {
+            if (url.hasParameter(THREADS_KEY)
+                    && executor instanceof ThreadPoolExecutor && !executor.isShutdown()) {
+                ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
+                int threads = url.getParameter(THREADS_KEY, 0);
+                int max = threadPoolExecutor.getMaximumPoolSize();
+                int core = threadPoolExecutor.getCorePoolSize();
+                if (threads > 0 && (threads != max || threads != core)) {
+                    if (threads < core) {
+                        threadPoolExecutor.setCorePoolSize(threads);
+                        if (core == max) {
+                            threadPoolExecutor.setMaximumPoolSize(threads);
+                        }
+                    } else {
+                        threadPoolExecutor.setMaximumPoolSize(threads);
+                        if (core == max) {
+                            threadPoolExecutor.setCorePoolSize(threads);
+                        }
+                    }
+                }
+            }
+        } catch (Throwable t) {
+            logger.error(t.getMessage(), t);
+        }
+    }
+
+    @Override
+    public ScheduledExecutorService nextScheduledExecutor() {
+        return scheduledExecutors.pollItem();
+    }
+
+    @Override
+    public ExecutorService getSharedExecutor() {
+        return SHARED_EXECUTOR;
+    }
+
+}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java
new file mode 100644
index 0000000..ce42081
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common.threadpool.manager;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.SPI;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ *
+ */
+@SPI("default")
+public interface ExecutorRepository {
+
+    /**
+     * Called by both Client and Server. TODO, consider separate these two parts.
+     * When the Client or Server starts for the first time, generate a new threadpool according to the parameters passed in usr.
+     *
+     * @param url
+     * @return
+     */
+    ExecutorService createExecutorIfAbsent(URL url);
+
+    ExecutorService getExecutor(URL url);
+
+    /**
+     * Modify some of the threadpool's properties according to the url, for example, coreSize, maxSize, ...
+     *
+     * @param url
+     * @param executor
+     */
+    void updateThreadpool(URL url, ExecutorService executor);
+
+    /**
+     * Returns a scheduler from the scheduler list, call this method whenever you need a scheduler for a cron job.
+     * If your cron cannot burden the possible schedule delay caused by sharing the same scheduler, please consider define a dedicate one.
+     *
+     * @return
+     */
+    ScheduledExecutorService nextScheduledExecutor();
+
+    /**
+     * Get the default shared threadpool.
+     *
+     * @return
+     */
+    ExecutorService getSharedExecutor();
+
+}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/Ring.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/Ring.java
new file mode 100644
index 0000000..eb9e50a
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/Ring.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common.threadpool.manager;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class Ring<T> {
+
+    AtomicInteger count = new AtomicInteger();
+
+    private List<T> itemList = new CopyOnWriteArrayList<T>();
+
+    public void addItem(T t) {
+        if (t != null) {
+            itemList.add(t);
+        }
+    }
+
+    public T pollItem() {
+        if (itemList.isEmpty()) {
+            return null;
+        }
+        if (itemList.size() == 1) {
+            return itemList.get(0);
+        }
+
+        if (count.intValue() > Integer.MAX_VALUE - 10000) {
+            count.set(count.get() % itemList.size());
+        }
+
+        int index = Math.abs(count.getAndIncrement()) % itemList.size();
+        return itemList.get(index);
+    }
+
+    public T peekItem() {
+        if (itemList.isEmpty()) {
+            return null;
+        }
+        if (itemList.size() == 1) {
+            return itemList.get(0);
+        }
+        int index = Math.abs(count.get()) % itemList.size();
+        return itemList.get(index);
+    }
+
+    public List<T> listItems() {
+        return Collections.unmodifiableList(itemList);
+    }
+}
diff --git a/dubbo-common/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.common.threadpool.manager.ExecutorRepository b/dubbo-common/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.common.threadpool.manager.ExecutorRepository
new file mode 100644
index 0000000..44199b0
--- /dev/null
+++ b/dubbo-common/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.common.threadpool.manager.ExecutorRepository
@@ -0,0 +1 @@
+default=org.apache.dubbo.common.threadpool.manager.DefaultExecutorRepository
\ No newline at end of file
diff --git a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/Application.java b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/Application.java
index f35f82a..25a9dd4 100644
--- a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/Application.java
+++ b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/Application.java
@@ -16,8 +16,6 @@
  */
 package org.apache.dubbo.demo.provider;
 
-import org.apache.dubbo.common.utils.ReflectUtils;
-
 import org.springframework.context.support.ClassPathXmlApplicationContext;
 
 public class Application {
@@ -27,7 +25,6 @@ public class Application {
      */
     public static void main(String[] args) throws Exception {
         ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/dubbo-provider.xml");
-        System.err.println(ReflectUtils.getDesc(new Class<?>[]{String.class, String[].class, Object[].class}));
         context.start();
         System.in.read();
     }
diff --git a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/DemoServiceImpl.java b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/DemoServiceImpl.java
index 31b9040..5e57f77 100644
--- a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/DemoServiceImpl.java
+++ b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/DemoServiceImpl.java
@@ -30,6 +30,11 @@ public class DemoServiceImpl implements DemoService {
     @Override
     public String sayHello(String name) {
         logger.info("Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress());
+        try {
+            Thread.sleep(10000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
         return "Hello " + name + ", response from provider: " + RpcContext.getContext().getLocalAddress();
     }
 
diff --git a/dubbo-monitor/dubbo-monitor-default/src/main/java/org/apache/dubbo/monitor/dubbo/MetricsFilter.java b/dubbo-monitor/dubbo-monitor-default/src/main/java/org/apache/dubbo/monitor/dubbo/MetricsFilter.java
index 97b75d5..2f26743 100644
--- a/dubbo-monitor/dubbo-monitor-default/src/main/java/org/apache/dubbo/monitor/dubbo/MetricsFilter.java
+++ b/dubbo-monitor/dubbo-monitor-default/src/main/java/org/apache/dubbo/monitor/dubbo/MetricsFilter.java
@@ -57,6 +57,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_PROTOCOL;
+import static org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_SERVICE_COMPONENT_KEY;
 import static org.apache.dubbo.monitor.Constants.DUBBO_CONSUMER;
 import static org.apache.dubbo.monitor.Constants.DUBBO_CONSUMER_METHOD;
 import static org.apache.dubbo.monitor.Constants.DUBBO_GROUP;
@@ -66,7 +67,6 @@ import static org.apache.dubbo.monitor.Constants.METHOD;
 import static org.apache.dubbo.monitor.Constants.METRICS_PORT;
 import static org.apache.dubbo.monitor.Constants.METRICS_PROTOCOL;
 import static org.apache.dubbo.monitor.Constants.SERVICE;
-import static org.apache.dubbo.remoting.Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
 
 public class MetricsFilter implements Filter {
 
diff --git a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockChannel.java b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockChannel.java
index 7a4961b..e563da6 100644
--- a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockChannel.java
+++ b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockChannel.java
@@ -24,6 +24,7 @@ import org.apache.dubbo.remoting.exchange.ExchangeHandler;
 
 import java.net.InetSocketAddress;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 
 public class MockChannel implements ExchangeChannel {
 
@@ -93,6 +94,16 @@ public class MockChannel implements ExchangeChannel {
         return null;
     }
 
+    @Override
+    public CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
+        return null;
+    }
+
     public ExchangeHandler getExchangeHandler() {
         return null;
     }
diff --git a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockedClient.java b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockedClient.java
index 1afe0d5..ad06cd2 100644
--- a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockedClient.java
+++ b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockedClient.java
@@ -29,6 +29,7 @@ import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeoutException;
 
 /**
@@ -81,13 +82,23 @@ public class MockedClient implements ExchangeClient {
     }
 
     public CompletableFuture<Object> request(Object msg) throws RemotingException {
-        return request(msg, 0);
+        return request(msg, null);
     }
 
     public CompletableFuture<Object> request(Object msg, int timeout) throws RemotingException {
+        return this.request(msg, timeout, null);
+    }
+
+    @Override
+    public CompletableFuture<Object> request(Object msg, ExecutorService executor) throws RemotingException {
+        return this.request(msg, 0, executor);
+    }
+
+    @Override
+    public CompletableFuture<Object> request(Object msg, int timeout, ExecutorService executor) throws RemotingException {
         this.invoked = msg;
         return new CompletableFuture<Object>() {
-            public Object get()  throws InterruptedException, ExecutionException {
+            public Object get() throws InterruptedException, ExecutionException {
                 return received;
             }
 
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Constants.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Constants.java
index 8fd6361..45e5a67 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Constants.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Constants.java
@@ -18,8 +18,6 @@
 package org.apache.dubbo.remoting;
 
 
-import java.util.concurrent.ExecutorService;
-
 public interface Constants {
 
     String BUFFER_KEY = "buffer";
@@ -117,8 +115,6 @@ public interface Constants {
 
     String CHANNEL_SEND_READONLYEVENT_KEY = "channel.readonly.send";
 
-    String EXECUTOR_SERVICE_COMPONENT_KEY = ExecutorService.class.getName();
-
     String RECONNECT_KEY = "reconnect";
 
     int DEFAULT_RECONNECT_PERIOD = 2000;
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java
index 0e4917d..c0cf131 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.remoting.Channel;
 import org.apache.dubbo.remoting.RemotingException;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 
 /**
  * ExchangeChannel. (API/SPI, Prototype, ThreadSafe)
@@ -33,6 +34,7 @@ public interface ExchangeChannel extends Channel {
      * @return response future
      * @throws RemotingException
      */
+    @Deprecated
     CompletableFuture<Object> request(Object request) throws RemotingException;
 
     /**
@@ -43,9 +45,29 @@ public interface ExchangeChannel extends Channel {
      * @return response future
      * @throws RemotingException
      */
+    @Deprecated
     CompletableFuture<Object> request(Object request, int timeout) throws RemotingException;
 
     /**
+     * send request.
+     *
+     * @param request
+     * @return response future
+     * @throws RemotingException
+     */
+    CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException;
+
+    /**
+     * send request.
+     *
+     * @param request
+     * @param timeout
+     * @return response future
+     * @throws RemotingException
+     */
+    CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException;
+
+    /**
      * get message handler.
      *
      * @return message handler
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
index 7f0e3a7..0abecf2 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.remoting.exchange.support;
 
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
 import org.apache.dubbo.common.timer.HashedWheelTimer;
 import org.apache.dubbo.common.timer.Timeout;
 import org.apache.dubbo.common.timer.Timer;
@@ -34,6 +35,7 @@ import java.util.Date;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
@@ -64,6 +66,16 @@ public class DefaultFuture extends CompletableFuture<Object> {
     private volatile long sent;
     private Timeout timeoutCheckTask;
 
+    private ExecutorService executor;
+
+    public ExecutorService getExecutor() {
+        return executor;
+    }
+
+    public void setExecutor(ExecutorService executor) {
+        this.executor = executor;
+    }
+
     private DefaultFuture(Channel channel, Request request, int timeout) {
         this.channel = channel;
         this.request = request;
@@ -92,8 +104,9 @@ public class DefaultFuture extends CompletableFuture<Object> {
      * @param timeout timeout
      * @return a new DefaultFuture
      */
-    public static DefaultFuture newFuture(Channel channel, Request request, int timeout) {
+    public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
         final DefaultFuture future = new DefaultFuture(channel, request, timeout);
+        future.setExecutor(executor);
         // timeout check
         timeoutCheck(future);
         return future;
@@ -178,7 +191,6 @@ public class DefaultFuture extends CompletableFuture<Object> {
         this.cancel(true);
     }
 
-
     private void doReceived(Response res) {
         if (res == null) {
             throw new IllegalStateException("response cannot be null");
@@ -190,6 +202,15 @@ public class DefaultFuture extends CompletableFuture<Object> {
         } else {
             this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
         }
+
+        // the result is returning, but the caller thread may still waiting
+        // to avoid endless waiting for whatever reason, notify caller thread to return.
+        if (executor != null && executor instanceof ThreadlessExecutor) {
+            ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
+            if (threadlessExecutor.isWaiting()) {
+                threadlessExecutor.notifyReturn();
+            }
+        }
     }
 
     private long getId() {
@@ -243,14 +264,17 @@ public class DefaultFuture extends CompletableFuture<Object> {
             if (future == null || future.isDone()) {
                 return;
             }
-            // create exception response.
-            Response timeoutResponse = new Response(future.getId());
-            // set timeout status.
-            timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
-            timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
-            // handle response.
-            DefaultFuture.received(future.getChannel(), timeoutResponse, true);
-
+            if (future.getExecutor() != null) {
+                future.getExecutor().execute(() -> {
+                    // create exception response.
+                    Response timeoutResponse = new Response(future.getId());
+                    // set timeout status.
+                    timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
+                    timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
+                    // handle response.
+                    DefaultFuture.received(future.getChannel(), timeoutResponse, true);
+                });
+            }
         }
     }
 }
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java
index 2529ac7..f96207b 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java
@@ -31,6 +31,7 @@ import org.apache.dubbo.remoting.exchange.support.DefaultFuture;
 
 import java.net.InetSocketAddress;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 
 import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
 import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
@@ -100,11 +101,21 @@ final class HeaderExchangeChannel implements ExchangeChannel {
 
     @Override
     public CompletableFuture<Object> request(Object request) throws RemotingException {
-        return request(request, channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT));
+        return request(request, null);
     }
 
     @Override
     public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
+        return request(request, timeout, null);
+    }
+
+    @Override
+    public CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException {
+        return request(request, channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT), executor);
+    }
+
+    @Override
+    public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
         if (closed) {
             throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
         }
@@ -113,7 +124,7 @@ final class HeaderExchangeChannel implements ExchangeChannel {
         req.setVersion(Version.getProtocolVersion());
         req.setTwoWay(true);
         req.setData(request);
-        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
+        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
         try {
             channel.send(req);
         } catch (RemotingException e) {
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
index 5a010fa..6671d1a 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
@@ -31,6 +31,7 @@ import org.apache.dubbo.remoting.exchange.ExchangeHandler;
 import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.dubbo.remoting.utils.UrlUtils.getHeartbeat;
@@ -85,6 +86,16 @@ public class HeaderExchangeClient implements ExchangeClient {
     }
 
     @Override
+    public CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException {
+        return channel.request(request, executor);
+    }
+
+    @Override
+    public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
+        return channel.request(request, timeout, executor);
+    }
+
+    @Override
     public ChannelHandler getChannelHandler() {
         return channel.getChannelHandler();
     }
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
index e005ad5..0241c29 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
@@ -21,7 +21,7 @@ import org.apache.dubbo.common.Version;
 import org.apache.dubbo.common.extension.ExtensionLoader;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.store.DataStore;
+import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
 import org.apache.dubbo.common.utils.ExecutorUtil;
 import org.apache.dubbo.common.utils.NetUtils;
 import org.apache.dubbo.remoting.Channel;
@@ -36,7 +36,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SIDE;
 import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_CLIENT_THREADPOOL;
 import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY;
 
@@ -50,6 +49,7 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
     private final Lock connectLock = new ReentrantLock();
     private final boolean needReconnect;
     protected volatile ExecutorService executor;
+    private ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
 
     public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
         super(url, handler);
@@ -84,11 +84,7 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
                     "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                             + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
         }
-
-        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
-                .getDefaultExtension().get(CONSUMER_SIDE, Integer.toString(url.getPort()));
-        ExtensionLoader.getExtensionLoader(DataStore.class)
-                .getDefaultExtension().remove(CONSUMER_SIDE, Integer.toString(url.getPort()));
+        executor = executorRepository.createExecutorIfAbsent(url);
     }
 
     protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
@@ -196,7 +192,7 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
 
             } else {
                 if (logger.isInfoEnabled()) {
-                    logger.info("Succeed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+                    logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                             + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                             + ", channel is " + this.getChannel());
                 }
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
index d1cbdbb..da5f7e3 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
@@ -20,7 +20,7 @@ import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.extension.ExtensionLoader;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.store.DataStore;
+import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
 import org.apache.dubbo.common.utils.ExecutorUtil;
 import org.apache.dubbo.common.utils.NetUtils;
 import org.apache.dubbo.remoting.Channel;
@@ -32,15 +32,13 @@ import org.apache.dubbo.remoting.Server;
 import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
 
 import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_VALUE;
-import static org.apache.dubbo.common.constants.CommonConstants.THREADS_KEY;
-import static org.apache.dubbo.remoting.Constants.IDLE_TIMEOUT_KEY;
-import static org.apache.dubbo.remoting.Constants.DEFAULT_IDLE_TIMEOUT;
 import static org.apache.dubbo.remoting.Constants.ACCEPTS_KEY;
 import static org.apache.dubbo.remoting.Constants.DEFAULT_ACCEPTS;
+import static org.apache.dubbo.remoting.Constants.DEFAULT_IDLE_TIMEOUT;
+import static org.apache.dubbo.remoting.Constants.IDLE_TIMEOUT_KEY;
 
 /**
  * AbstractServer
@@ -55,6 +53,8 @@ public abstract class AbstractServer extends AbstractEndpoint implements Server
     private int accepts;
     private int idleTimeout;
 
+    private ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
+
     public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
         super(url, handler);
         localAddress = getUrl().toInetSocketAddress();
@@ -76,9 +76,7 @@ public abstract class AbstractServer extends AbstractEndpoint implements Server
             throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                     + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
         }
-        //fixme replace this with better method
-        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
-        executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
+        executor = executorRepository.createExecutorIfAbsent(url);
     }
 
     protected abstract void doOpen() throws Throwable;
@@ -110,30 +108,7 @@ public abstract class AbstractServer extends AbstractEndpoint implements Server
         } catch (Throwable t) {
             logger.error(t.getMessage(), t);
         }
-        try {
-            if (url.hasParameter(THREADS_KEY)
-                    && executor instanceof ThreadPoolExecutor && !executor.isShutdown()) {
-                ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
-                int threads = url.getParameter(THREADS_KEY, 0);
-                int max = threadPoolExecutor.getMaximumPoolSize();
-                int core = threadPoolExecutor.getCorePoolSize();
-                if (threads > 0 && (threads != max || threads != core)) {
-                    if (threads < core) {
-                        threadPoolExecutor.setCorePoolSize(threads);
-                        if (core == max) {
-                            threadPoolExecutor.setMaximumPoolSize(threads);
-                        }
-                    } else {
-                        threadPoolExecutor.setMaximumPoolSize(threads);
-                        if (core == max) {
-                            threadPoolExecutor.setCorePoolSize(threads);
-                        }
-                    }
-                }
-            }
-        } catch (Throwable t) {
-            logger.error(t.getMessage(), t);
-        }
+        executorRepository.updateThreadpool(url, executor);
         super.setUrl(getUrl().addParameters(url.getParameters()));
     }
 
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java
index 4fe83ec..a709a45 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java
@@ -20,29 +20,21 @@ import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.extension.ExtensionLoader;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.store.DataStore;
-import org.apache.dubbo.common.threadpool.ThreadPool;
-import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
 import org.apache.dubbo.remoting.Channel;
 import org.apache.dubbo.remoting.ChannelHandler;
-import org.apache.dubbo.remoting.Constants;
 import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.exchange.Request;
+import org.apache.dubbo.remoting.exchange.Response;
+import org.apache.dubbo.remoting.exchange.support.DefaultFuture;
 import org.apache.dubbo.remoting.transport.ChannelHandlerDelegate;
 
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SIDE;
-import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY;
 
 public class WrappedChannelHandler implements ChannelHandlerDelegate {
 
     protected static final Logger logger = LoggerFactory.getLogger(WrappedChannelHandler.class);
 
-    protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
-
-    protected final ExecutorService executor;
-
     protected final ChannelHandler handler;
 
     protected final URL url;
@@ -50,24 +42,10 @@ public class WrappedChannelHandler implements ChannelHandlerDelegate {
     public WrappedChannelHandler(ChannelHandler handler, URL url) {
         this.handler = handler;
         this.url = url;
-        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
-
-        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
-        if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
-            componentKey = CONSUMER_SIDE;
-        }
-        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
-        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
     }
 
     public void close() {
-        try {
-            if (executor != null) {
-                executor.shutdown();
-            }
-        } catch (Throwable t) {
-            logger.warn("fail to destroy thread pool of server: " + t.getMessage(), t);
-        }
+
     }
 
     @Override
@@ -95,8 +73,16 @@ public class WrappedChannelHandler implements ChannelHandlerDelegate {
         handler.caught(channel, exception);
     }
 
-    public ExecutorService getExecutor() {
-        return executor;
+    protected void sendFeedback(Channel channel, Request request, Throwable t) throws RemotingException {
+        if (request.isTwoWay()) {
+            String msg = "Server side(" + url.getIp() + "," + url.getPort()
+                    + ") thread pool is exhausted, detail msg:" + t.getMessage();
+            Response response = new Response(request.getId(), request.getVersion());
+            response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
+            response.setErrorMessage(msg);
+            channel.send(response);
+            return;
+        }
     }
 
     @Override
@@ -112,12 +98,46 @@ public class WrappedChannelHandler implements ChannelHandlerDelegate {
         return url;
     }
 
-    public ExecutorService getExecutorService() {
-        ExecutorService cexecutor = executor;
-        if (cexecutor == null || cexecutor.isShutdown()) {
-            cexecutor = SHARED_EXECUTOR;
+    /**
+     * Currently, this method is mainly customized to facilitate the thread model on consumer side.
+     * 1. Use ThreadlessExecutor, aka., delegate callback directly to the thread initiating the call.
+     * 2. Use shared executor to execute the callback.
+     *
+     * @param msg
+     * @return
+     */
+    public ExecutorService getPreferredExecutorService(Object msg) {
+        if (msg instanceof Response) {
+            Response response = (Response) msg;
+            DefaultFuture responseFuture = DefaultFuture.getFuture(response.getId());
+            // a typical scenario is the response returned after timeout, the timeout response may has completed the future
+            if (responseFuture == null) {
+                return getSharedExecutorService();
+            } else {
+                ExecutorService executor = responseFuture.getExecutor();
+                if (executor == null || executor.isShutdown()) {
+                    executor = getSharedExecutorService();
+                }
+                return executor;
+            }
+        } else {
+            return getSharedExecutorService();
         }
-        return cexecutor;
     }
 
+    /**
+     * get the shared executor for current Server or Client
+     *
+     * @return
+     */
+    public ExecutorService getSharedExecutorService() {
+        return ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().createExecutorIfAbsent(url);
+    }
+
+    @Deprecated
+    public ExecutorService getExecutorService() {
+        return getSharedExecutorService();
+    }
+
+
 }
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/all/AllChannelHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/all/AllChannelHandler.java
index 3ea9d40..25158d0 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/all/AllChannelHandler.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/all/AllChannelHandler.java
@@ -22,7 +22,6 @@ import org.apache.dubbo.remoting.ChannelHandler;
 import org.apache.dubbo.remoting.ExecutionException;
 import org.apache.dubbo.remoting.RemotingException;
 import org.apache.dubbo.remoting.exchange.Request;
-import org.apache.dubbo.remoting.exchange.Response;
 import org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable;
 import org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable.ChannelState;
 import org.apache.dubbo.remoting.transport.dispatcher.WrappedChannelHandler;
@@ -58,22 +57,13 @@ public class AllChannelHandler extends WrappedChannelHandler {
 
     @Override
     public void received(Channel channel, Object message) throws RemotingException {
-        ExecutorService executor = getExecutorService();
+        ExecutorService executor = getPreferredExecutorService(message);
         try {
             executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
         } catch (Throwable t) {
-            //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
-            //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
         	if(message instanceof Request && t instanceof RejectedExecutionException){
-        		Request request = (Request)message;
-        		if(request.isTwoWay()){
-        			String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
-        			Response response = new Response(request.getId(), request.getVersion());
-        			response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
-        			response.setErrorMessage(msg);
-        			channel.send(response);
-        			return;
-        		}
+                sendFeedback(channel, (Request) message, t);
+                return;
         	}
             throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
         }
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/connection/ConnectionOrderedChannelHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/connection/ConnectionOrderedChannelHandler.java
index 33bc279..2132cac 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/connection/ConnectionOrderedChannelHandler.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/connection/ConnectionOrderedChannelHandler.java
@@ -24,7 +24,6 @@ import org.apache.dubbo.remoting.ChannelHandler;
 import org.apache.dubbo.remoting.ExecutionException;
 import org.apache.dubbo.remoting.RemotingException;
 import org.apache.dubbo.remoting.exchange.Request;
-import org.apache.dubbo.remoting.exchange.Response;
 import org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable;
 import org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable.ChannelState;
 import org.apache.dubbo.remoting.transport.dispatcher.WrappedChannelHandler;
@@ -80,21 +79,13 @@ public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {
 
     @Override
     public void received(Channel channel, Object message) throws RemotingException {
-        ExecutorService executor = getExecutorService();
+        ExecutorService executor = getPreferredExecutorService(message);
         try {
             executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
         } catch (Throwable t) {
-            //fix, reject exception can not be sent to consumer because thread pool is full, resulting in consumers waiting till timeout.
             if (message instanceof Request && t instanceof RejectedExecutionException) {
-                Request request = (Request) message;
-                if (request.isTwoWay()) {
-                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
-                    Response response = new Response(request.getId(), request.getVersion());
-                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
-                    response.setErrorMessage(msg);
-                    channel.send(response);
-                    return;
-                }
+                sendFeedback(channel, (Request) message, t);
+                return;
             }
             throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
         }
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/message/MessageOnlyChannelHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/direct/DirectChannelHandler.java
similarity index 66%
copy from dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/message/MessageOnlyChannelHandler.java
copy to dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/direct/DirectChannelHandler.java
index 2f51860..9bb8d1f 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/message/MessageOnlyChannelHandler.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/direct/DirectChannelHandler.java
@@ -14,9 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dubbo.remoting.transport.dispatcher.message;
+package org.apache.dubbo.remoting.transport.dispatcher.direct;
 
 import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
 import org.apache.dubbo.remoting.Channel;
 import org.apache.dubbo.remoting.ChannelHandler;
 import org.apache.dubbo.remoting.ExecutionException;
@@ -27,19 +28,23 @@ import org.apache.dubbo.remoting.transport.dispatcher.WrappedChannelHandler;
 
 import java.util.concurrent.ExecutorService;
 
-public class MessageOnlyChannelHandler extends WrappedChannelHandler {
+public class DirectChannelHandler extends WrappedChannelHandler {
 
-    public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
+    public DirectChannelHandler(ChannelHandler handler, URL url) {
         super(handler, url);
     }
 
     @Override
     public void received(Channel channel, Object message) throws RemotingException {
-        ExecutorService executor = getExecutorService();
-        try {
-            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
-        } catch (Throwable t) {
-            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
+        ExecutorService executor = getPreferredExecutorService(message);
+        if (executor instanceof ThreadlessExecutor) {
+            try {
+                executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
+            } catch (Throwable t) {
+                throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
+            }
+        } else {
+            handler.received(channel, message);
         }
     }
 
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/direct/DirectDispatcher.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/direct/DirectDispatcher.java
index f18065d..aaed4e7 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/direct/DirectDispatcher.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/direct/DirectDispatcher.java
@@ -29,7 +29,7 @@ public class DirectDispatcher implements Dispatcher {
 
     @Override
     public ChannelHandler dispatch(ChannelHandler handler, URL url) {
-        return handler;
+        return new DirectChannelHandler(handler, url);
     }
 
 }
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionChannelHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionChannelHandler.java
index e39d138..761e26c 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionChannelHandler.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionChannelHandler.java
@@ -17,12 +17,12 @@
 package org.apache.dubbo.remoting.transport.dispatcher.execution;
 
 import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
 import org.apache.dubbo.remoting.Channel;
 import org.apache.dubbo.remoting.ChannelHandler;
 import org.apache.dubbo.remoting.ExecutionException;
 import org.apache.dubbo.remoting.RemotingException;
 import org.apache.dubbo.remoting.exchange.Request;
-import org.apache.dubbo.remoting.exchange.Response;
 import org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable;
 import org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable.ChannelState;
 import org.apache.dubbo.remoting.transport.dispatcher.WrappedChannelHandler;
@@ -42,7 +42,8 @@ public class ExecutionChannelHandler extends WrappedChannelHandler {
 
     @Override
     public void received(Channel channel, Object message) throws RemotingException {
-        ExecutorService executor = getExecutorService();
+        ExecutorService executor = getPreferredExecutorService(message);
+
         if (message instanceof Request) {
             try {
                 executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
@@ -51,19 +52,12 @@ public class ExecutionChannelHandler extends WrappedChannelHandler {
                 // therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent
                 // this scenario from happening, but a better solution should be considered later.
                 if (t instanceof RejectedExecutionException) {
-                    Request request = (Request) message;
-                    if (request.isTwoWay()) {
-                        String msg = "Server side(" + url.getIp() + "," + url.getPort()
-                                + ") thread pool is exhausted, detail msg:" + t.getMessage();
-                        Response response = new Response(request.getId(), request.getVersion());
-                        response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
-                        response.setErrorMessage(msg);
-                        channel.send(response);
-                        return;
-                    }
+                    sendFeedback(channel, (Request) message, t);
                 }
                 throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);
             }
+        } else if (executor instanceof ThreadlessExecutor) {
+            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
         } else {
             handler.received(channel, message);
         }
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/message/MessageOnlyChannelHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/message/MessageOnlyChannelHandler.java
index 2f51860..2cd20cc 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/message/MessageOnlyChannelHandler.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/message/MessageOnlyChannelHandler.java
@@ -35,7 +35,7 @@ public class MessageOnlyChannelHandler extends WrappedChannelHandler {
 
     @Override
     public void received(Channel channel, Object message) throws RemotingException {
-        ExecutorService executor = getExecutorService();
+        ExecutorService executor = getPreferredExecutorService(message);
         try {
             executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
         } catch (Throwable t) {
diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
index 2dd4005..0f19d15 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
@@ -95,7 +95,7 @@ public class DefaultFutureTest {
         // timeout after 5 seconds.
         Channel channel = new MockedChannel();
         Request request = new Request(10);
-        DefaultFuture f = DefaultFuture.newFuture(channel, request, 5000);
+        DefaultFuture f = DefaultFuture.newFuture(channel, request, 5000, null);
         //mark the future is sent
         DefaultFuture.sent(channel, request);
         while (!f.isDone()) {
@@ -119,7 +119,7 @@ public class DefaultFutureTest {
     private DefaultFuture defaultFuture(int timeout) {
         Channel channel = new MockedChannel();
         Request request = new Request(index.getAndIncrement());
-        return DefaultFuture.newFuture(channel, request, timeout);
+        return DefaultFuture.newFuture(channel, request, timeout, null);
     }
 
 }
\ No newline at end of file
diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannelTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannelTest.java
index 92739bf..2affe7d 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannelTest.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannelTest.java
@@ -17,23 +17,21 @@
 package org.apache.dubbo.remoting.exchange.support.header;
 
 import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.exchange.Request;
 import org.apache.dubbo.remoting.exchange.support.DefaultFuture;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-
-import org.apache.dubbo.remoting.Channel;
-import org.apache.dubbo.remoting.RemotingException;
-import org.apache.dubbo.remoting.exchange.Request;
-
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 import java.util.List;
 
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class HeaderExchangeChannelTest {
@@ -193,7 +191,7 @@ public class HeaderExchangeChannelTest {
     public void closeWithTimeoutTest02() {
         Assertions.assertFalse(channel.isClosed());
         Request request = new Request();
-        DefaultFuture.newFuture(channel, request, 100);
+        DefaultFuture.newFuture(channel, request, 100, null);
         header.close(100);
         //return directly
         header.close(1000);
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
index 815bec5..978bf7b 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
@@ -18,10 +18,14 @@ package org.apache.dubbo.rpc;
 
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
 
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 
 /**
@@ -47,6 +51,7 @@ public class AsyncRpcResult extends AbstractResult {
      */
     private RpcContext storedContext;
     private RpcContext storedServerContext;
+    private Executor executor;
 
     private Invocation invocation;
 
@@ -99,7 +104,7 @@ public class AsyncRpcResult extends AbstractResult {
     public Result getAppResponse() {
         try {
             if (this.isDone()) {
-                return this.get();
+                return super.get();
             }
         } catch (Exception e) {
             // This should never happen;
@@ -108,32 +113,36 @@ public class AsyncRpcResult extends AbstractResult {
         return new AppResponse();
     }
 
+    /**
+     * This method will always return after a maximum 'timeout' waiting:
+     * 1. if value returns before timeout, return normally.
+     * 2. if no value returns after timeout, throw TimeoutException.
+     *
+     * @return
+     * @throws InterruptedException
+     * @throws ExecutionException
+     */
+    @Override
+    public Result get() throws InterruptedException, ExecutionException {
+        if (executor != null) {
+            ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
+            threadlessExecutor.waitAndDrain();
+        }
+        return super.get();
+    }
+
+    @Override
+    public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+        return this.get();
+    }
+
     @Override
     public Object recreate() throws Throwable {
         RpcInvocation rpcInvocation = (RpcInvocation) invocation;
         if (InvokeMode.FUTURE == rpcInvocation.getInvokeMode()) {
-            AppResponse appResponse = new AppResponse();
-            CompletableFuture<Object> future = new CompletableFuture<>();
-            appResponse.setValue(future);
-            this.whenComplete((result, t) -> {
-                if (t != null) {
-                    if (t instanceof CompletionException) {
-                        t = t.getCause();
-                    }
-                    future.completeExceptionally(t);
-                } else {
-                    if (result.hasException()) {
-                        future.completeExceptionally(result.getException());
-                    } else {
-                        future.complete(result.getValue());
-                    }
-                }
-            });
-            return appResponse.recreate();
-        } else if (this.isDone()) {
-            return this.get().recreate();
+            return RpcContext.getContext().getFuture();
         }
-        return (new AppResponse()).recreate();
+        return getAppResponse().recreate();
     }
 
     @Override
@@ -196,6 +205,14 @@ public class AsyncRpcResult extends AbstractResult {
         return invocation;
     }
 
+    public Executor getExecutor() {
+        return executor;
+    }
+
+    public void setExecutor(Executor executor) {
+        this.executor = executor;
+    }
+
     /**
      * tmp context to use when the thread switch to Dubbo thread.
      */
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/FutureAdapter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/FutureAdapter.java
similarity index 86%
rename from dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/FutureAdapter.java
rename to dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/FutureAdapter.java
index 03954d1..d50f91e 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/FutureAdapter.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/FutureAdapter.java
@@ -14,11 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dubbo.rpc.protocol.dubbo;
-
-import org.apache.dubbo.rpc.AppResponse;
-import org.apache.dubbo.rpc.RpcException;
+package org.apache.dubbo.rpc;
 
+import java.lang.ref.SoftReference;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
@@ -32,7 +30,10 @@ public class FutureAdapter<V> extends CompletableFuture<V> {
 
     private CompletableFuture<AppResponse> appResponseFuture;
 
-    public FutureAdapter(CompletableFuture<AppResponse> future) {
+    private SoftReference<Invocation> invocationSoftReference;
+
+    public FutureAdapter(CompletableFuture<AppResponse> future, Invocation invocation) {
+        this.invocationSoftReference = new SoftReference<>(invocation);
         this.appResponseFuture = future;
         future.whenComplete((appResponse, t) -> {
             if (t != null) {
@@ -53,6 +54,10 @@ public class FutureAdapter<V> extends CompletableFuture<V> {
     // TODO figure out the meaning of cancel in DefaultFuture.
     @Override
     public boolean cancel(boolean mayInterruptIfRunning) {
+//        Invocation invocation = invocationSoftReference.get();
+//        if (invocation != null) {
+//            invocation.getInvoker().invoke(cancel);
+//        }
         return appResponseFuture.cancel(mayInterruptIfRunning);
     }
 
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
index 58cb410..be32bb5 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
@@ -18,13 +18,18 @@ package org.apache.dubbo.rpc.protocol;
 
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.Version;
+import org.apache.dubbo.common.extension.ExtensionLoader;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
+import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
 import org.apache.dubbo.common.utils.ArrayUtils;
 import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.common.utils.NetUtils;
 import org.apache.dubbo.rpc.AsyncRpcResult;
+import org.apache.dubbo.rpc.FutureAdapter;
 import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.InvokeMode;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.Result;
 import org.apache.dubbo.rpc.RpcContext;
@@ -36,6 +41,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -151,26 +157,38 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
         invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
         RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
 
+        AsyncRpcResult asyncResult;
         try {
-            return doInvoke(invocation);
+            asyncResult = (AsyncRpcResult) doInvoke(invocation);
         } catch (InvocationTargetException e) { // biz exception
             Throwable te = e.getTargetException();
             if (te == null) {
-                return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
+                asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
             } else {
                 if (te instanceof RpcException) {
                     ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
                 }
-                return AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
+                asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
             }
         } catch (RpcException e) {
             if (e.isBiz()) {
-                return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
+                asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
             } else {
                 throw e;
             }
         } catch (Throwable e) {
-            return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
+            asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
+        }
+        RpcContext.getContext().setFuture(new FutureAdapter(asyncResult, inv));
+        return asyncResult;
+    }
+
+    protected ExecutorService getCallbackExecutor(URL url, Invocation inv) {
+        ExecutorService sharedExecutor = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getExecutor(url);
+        if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) {
+            return new ThreadlessExecutor(sharedExecutor);
+        } else {
+            return sharedExecutor;
         }
     }
 
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java
index 1a2347c..2a190ad 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java
@@ -74,6 +74,7 @@ public class DecodeableRpcResult extends AppResponse implements Codec, Decodeabl
 
     @Override
     public Object decode(Channel channel, InputStream input) throws IOException {
+        log.debug("Decoding in thread -- " + Thread.currentThread().getName());
         ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
                 .deserialize(channel.getUrl(), input);
 
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
index ece7149..4cf6259 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
@@ -35,6 +35,7 @@ import org.apache.dubbo.rpc.support.RpcUtils;
 
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.locks.ReentrantLock;
 
 import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
@@ -95,9 +96,10 @@ public class DubboInvoker<T> extends AbstractInvoker<T> {
                 return AsyncRpcResult.newDefaultAsyncResult(invocation);
             } else {
                 AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
-                CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
+                ExecutorService executor = getCallbackExecutor(getUrl(), inv);
+                asyncRpcResult.setExecutor(executor);
+                CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout, executor);
                 asyncRpcResult.subscribeTo(responseFuture);
-                RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
                 return asyncRpcResult;
             }
         } catch (TimeoutException e) {
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
index a495775..77eae2c 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
@@ -29,6 +29,7 @@ import org.apache.dubbo.remoting.exchange.Exchangers;
 
 import java.net.InetSocketAddress;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -114,6 +115,20 @@ final class LazyConnectExchangeClient implements ExchangeClient {
         return client.request(request, timeout);
     }
 
+    @Override
+    public CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException {
+        warning();
+        initClient();
+        return client.request(request, executor);
+    }
+
+    @Override
+    public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
+        warning();
+        initClient();
+        return client.request(request, timeout, executor);
+    }
+
     /**
      * If {@link #REQUEST_WITH_WARNING_KEY} is configured, then warn once every 5000 invocations.
      */
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
index c7074aa..ed3c61b 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
@@ -27,6 +27,7 @@ import org.apache.dubbo.remoting.exchange.ExchangeHandler;
 
 import java.net.InetSocketAddress;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.dubbo.remoting.Constants.RECONNECT_KEY;
@@ -81,6 +82,16 @@ final class ReferenceCountExchangeClient implements ExchangeClient {
     }
 
     @Override
+    public CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException {
+        return client.request(request, executor);
+    }
+
+    @Override
+    public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
+        return client.request(request, timeout, executor);
+    }
+
+    @Override
     public boolean isConnected() {
         return client.isConnected();
     }
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/status/ThreadPoolStatusChecker.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/status/ThreadPoolStatusChecker.java
index 8506f1b..ea623c8 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/status/ThreadPoolStatusChecker.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/status/ThreadPoolStatusChecker.java
@@ -16,12 +16,12 @@
  */
 package org.apache.dubbo.rpc.protocol.dubbo.status;
 
+import org.apache.dubbo.common.constants.CommonConstants;
 import org.apache.dubbo.common.extension.Activate;
 import org.apache.dubbo.common.extension.ExtensionLoader;
 import org.apache.dubbo.common.status.Status;
 import org.apache.dubbo.common.status.StatusChecker;
 import org.apache.dubbo.common.store.DataStore;
-import org.apache.dubbo.remoting.Constants;
 
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
@@ -36,7 +36,7 @@ public class ThreadPoolStatusChecker implements StatusChecker {
     @Override
     public Status check() {
         DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
-        Map<String, Object> executors = dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY);
+        Map<String, Object> executors = dataStore.get(CommonConstants.EXECUTOR_SERVICE_COMPONENT_KEY);
 
         StringBuilder msg = new StringBuilder();
         Status.Level level = Status.Level.OK;
diff --git a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java
index 3b0e3d9..3d0aee9 100644
--- a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java
@@ -26,11 +26,9 @@ import org.apache.dubbo.rpc.AsyncRpcResult;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.Result;
-import org.apache.dubbo.rpc.RpcContext;
 import org.apache.dubbo.rpc.RpcException;
 import org.apache.dubbo.rpc.RpcInvocation;
 import org.apache.dubbo.rpc.protocol.AbstractInvoker;
-import org.apache.dubbo.rpc.protocol.dubbo.FutureAdapter;
 
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -97,7 +95,6 @@ public class ThriftInvoker<T> extends AbstractInvoker<T> {
             AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
             CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
             asyncRpcResult.subscribeTo(responseFuture);
-            RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
             return asyncRpcResult;
         } catch (TimeoutException e) {
             throw new RpcException(RpcException.TIMEOUT_EXCEPTION, e.getMessage(), e);