You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2019/06/03 08:07:40 UTC
[dubbo] branch performance-tuning-2.7.x updated: Reduce context
switching cost by optimizing thread model on consumer side. (#4131)
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch performance-tuning-2.7.x
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/performance-tuning-2.7.x by this push:
new 5cc3821 Reduce context switching cost by optimizing thread model on consumer side. (#4131)
5cc3821 is described below
commit 5cc3821438e4f9bcf3a8503ba08e9a0008ef0f4c
Author: ken.lj <ke...@gmail.com>
AuthorDate: Mon Jun 3 16:07:27 2019 +0800
Reduce context switching cost by optimizing thread model on consumer side. (#4131)
---
.../dubbo/common/constants/CommonConstants.java | 3 +
.../common/threadpool/ThreadlessExecutor.java | 150 +++++++++++++++++++++
.../manager/DefaultExecutorRepository.java | 123 +++++++++++++++++
.../threadpool/manager/ExecutorRepository.java | 65 +++++++++
.../dubbo/common/threadpool/manager/Ring.java | 66 +++++++++
...bo.common.threadpool.manager.ExecutorRepository | 1 +
.../apache/dubbo/demo/provider/Application.java | 3 -
.../dubbo/demo/provider/DemoServiceImpl.java | 5 +
.../apache/dubbo/monitor/dubbo/MetricsFilter.java | 2 +-
.../apache/dubbo/registry/dubbo/MockChannel.java | 11 ++
.../apache/dubbo/registry/dubbo/MockedClient.java | 15 ++-
.../java/org/apache/dubbo/remoting/Constants.java | 4 -
.../dubbo/remoting/exchange/ExchangeChannel.java | 22 +++
.../remoting/exchange/support/DefaultFuture.java | 44 ++++--
.../support/header/HeaderExchangeChannel.java | 15 ++-
.../support/header/HeaderExchangeClient.java | 11 ++
.../dubbo/remoting/transport/AbstractClient.java | 12 +-
.../dubbo/remoting/transport/AbstractServer.java | 39 +-----
.../dispatcher/WrappedChannelHandler.java | 88 +++++++-----
.../dispatcher/all/AllChannelHandler.java | 16 +--
.../ConnectionOrderedChannelHandler.java | 15 +--
.../DirectChannelHandler.java} | 21 +--
.../dispatcher/direct/DirectDispatcher.java | 2 +-
.../execution/ExecutionChannelHandler.java | 18 +--
.../message/MessageOnlyChannelHandler.java | 2 +-
.../exchange/support/DefaultFutureTest.java | 4 +-
.../support/header/HeaderExchangeChannelTest.java | 12 +-
.../java/org/apache/dubbo/rpc/AsyncRpcResult.java | 63 +++++----
.../java/org/apache/dubbo/rpc}/FutureAdapter.java | 15 ++-
.../apache/dubbo/rpc/protocol/AbstractInvoker.java | 28 +++-
.../rpc/protocol/dubbo/DecodeableRpcResult.java | 1 +
.../dubbo/rpc/protocol/dubbo/DubboInvoker.java | 6 +-
.../protocol/dubbo/LazyConnectExchangeClient.java | 15 +++
.../dubbo/ReferenceCountExchangeClient.java | 11 ++
.../dubbo/status/ThreadPoolStatusChecker.java | 4 +-
.../dubbo/rpc/protocol/thrift/ThriftInvoker.java | 3 -
36 files changed, 723 insertions(+), 192 deletions(-)
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
index d005ae9..90811e8 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.common.constants;
+import java.util.concurrent.ExecutorService;
import java.util.regex.Pattern;
public interface CommonConstants {
@@ -70,6 +71,8 @@ public interface CommonConstants {
int DEFAULT_THREADS = 200;
+ String EXECUTOR_SERVICE_COMPONENT_KEY = ExecutorService.class.getName();
+
String THREADPOOL_KEY = "threadpool";
String THREAD_NAME_KEY = "threadname";
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
new file mode 100644
index 0000000..322d8d9
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common.threadpool;
+
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * The most important difference between this Executor and other normal Executor is that this one doesn't manage
+ * any thread.
+ *
+ * Tasks submitted to this executor through {@link #execute(Runnable)} will not get scheduled to a specific thread, though normal executors always do the schedule.
+ * Those tasks are stored in a blocking queue and will only be executed when a thead calls {@link #waitAndDrain()}, the thead executing the task
+ * is exactly the same as the one calling waitAndDrain.
+ */
+public class ThreadlessExecutor extends AbstractExecutorService {
+ private static final Logger logger = LoggerFactory.getLogger(ThreadlessExecutor.class.getName());
+
+ private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
+
+ private ExecutorService sharedExecutor;
+
+ private volatile boolean waiting = true;
+
+ private final Object lock = new Object();
+
+ public ThreadlessExecutor(ExecutorService sharedExecutor) {
+ this.sharedExecutor = sharedExecutor;
+ }
+
+ public boolean isWaiting() {
+ return waiting;
+ }
+
+ /**
+ * Waits until there is a Runnable, then executes it and all queued Runnables after it.
+ */
+ public void waitAndDrain() throws InterruptedException {
+ Runnable runnable = queue.take();
+
+ synchronized (lock) {
+ waiting = false;
+ runnable.run();
+ }
+
+ runnable = queue.poll();
+ while (runnable != null) {
+ try {
+ runnable.run();
+ } catch (Throwable t) {
+ logger.info(t);
+
+ }
+ runnable = queue.poll();
+ }
+ }
+
+ public long waitAndDrain(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
+ /*long startInMs = System.currentTimeMillis();
+ Runnable runnable = queue.poll(timeout, unit);
+ if (runnable == null) {
+ throw new TimeoutException();
+ }
+ runnable.run();
+ long elapsedInMs = System.currentTimeMillis() - startInMs;
+ long timeLeft = timeout - elapsedInMs;
+ if (timeLeft < 0) {
+ throw new TimeoutException();
+ }
+ return timeLeft;*/
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * If the calling thread is still waiting for a callback task, add the task into the blocking queue to wait for schedule.
+ * Otherwise, submit to shared callback executor directly.
+ *
+ * @param runnable
+ */
+ @Override
+ public void execute(Runnable runnable) {
+ synchronized (lock) {
+ if (!waiting) {
+ sharedExecutor.execute(runnable);
+ } else {
+ queue.add(runnable);
+ }
+ }
+ }
+
+ /**
+ * tells the thread blocking on {@link #waitAndDrain()} to return, despite of the current status, to avoid endless waiting.
+ */
+ public void notifyReturn() {
+ // an empty runnable task.
+ execute(() -> {
+ });
+ }
+
+ /**
+ * The following methods are still not supported
+ */
+
+ @Override
+ public void shutdown() {
+
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ return null;
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return false;
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return false;
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return false;
+ }
+}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
new file mode 100644
index 0000000..1499056
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common.threadpool.manager;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.threadpool.ThreadPool;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SIDE;
+import static org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_SERVICE_COMPONENT_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.THREADS_KEY;
+
+/**
+ * Consider implementing {@code Licycle} to enable executors shutdown when the process stops.
+ */
+public class DefaultExecutorRepository implements ExecutorRepository {
+ private static final Logger logger = LoggerFactory.getLogger(DefaultExecutorRepository.class);
+
+ private int DEFAULT_SCHEDULER_SIZE = Runtime.getRuntime().availableProcessors();
+
+ private final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
+
+ private Ring<ScheduledExecutorService> scheduledExecutors = new Ring<>();
+
+ private ScheduledExecutorService reconnectScheduledExecutor;
+
+ private ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>> data = new ConcurrentHashMap<>();
+
+ public DefaultExecutorRepository() {
+// for (int i = 0; i < DEFAULT_SCHEDULER_SIZE; i++) {
+// ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-framework-scheduler"));
+// scheduledExecutors.addItem(scheduler);
+// }
+//
+// reconnectScheduledExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-reconnect-scheduler"));
+ }
+
+ public ExecutorService createExecutorIfAbsent(URL url) {
+ String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY;
+ if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
+ componentKey = CONSUMER_SIDE;
+ }
+ Map<Integer, ExecutorService> executors = data.computeIfAbsent(componentKey, k -> new ConcurrentHashMap<>());
+ return executors.computeIfAbsent(url.getPort(), k -> (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url));
+ }
+
+ public ExecutorService getExecutor(URL url) {
+ String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY;
+ if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
+ componentKey = CONSUMER_SIDE;
+ }
+ Map<Integer, ExecutorService> executors = data.get(componentKey);
+ if (executors == null) {
+ return null;
+ }
+ return executors.get(url.getPort());
+ }
+
+ @Override
+ public void updateThreadpool(URL url, ExecutorService executor) {
+ try {
+ if (url.hasParameter(THREADS_KEY)
+ && executor instanceof ThreadPoolExecutor && !executor.isShutdown()) {
+ ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
+ int threads = url.getParameter(THREADS_KEY, 0);
+ int max = threadPoolExecutor.getMaximumPoolSize();
+ int core = threadPoolExecutor.getCorePoolSize();
+ if (threads > 0 && (threads != max || threads != core)) {
+ if (threads < core) {
+ threadPoolExecutor.setCorePoolSize(threads);
+ if (core == max) {
+ threadPoolExecutor.setMaximumPoolSize(threads);
+ }
+ } else {
+ threadPoolExecutor.setMaximumPoolSize(threads);
+ if (core == max) {
+ threadPoolExecutor.setCorePoolSize(threads);
+ }
+ }
+ }
+ }
+ } catch (Throwable t) {
+ logger.error(t.getMessage(), t);
+ }
+ }
+
+ @Override
+ public ScheduledExecutorService nextScheduledExecutor() {
+ return scheduledExecutors.pollItem();
+ }
+
+ @Override
+ public ExecutorService getSharedExecutor() {
+ return SHARED_EXECUTOR;
+ }
+
+}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java
new file mode 100644
index 0000000..ce42081
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common.threadpool.manager;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.SPI;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ *
+ */
+@SPI("default")
+public interface ExecutorRepository {
+
+ /**
+ * Called by both Client and Server. TODO, consider separate these two parts.
+ * When the Client or Server starts for the first time, generate a new threadpool according to the parameters passed in usr.
+ *
+ * @param url
+ * @return
+ */
+ ExecutorService createExecutorIfAbsent(URL url);
+
+ ExecutorService getExecutor(URL url);
+
+ /**
+ * Modify some of the threadpool's properties according to the url, for example, coreSize, maxSize, ...
+ *
+ * @param url
+ * @param executor
+ */
+ void updateThreadpool(URL url, ExecutorService executor);
+
+ /**
+ * Returns a scheduler from the scheduler list, call this method whenever you need a scheduler for a cron job.
+ * If your cron cannot burden the possible schedule delay caused by sharing the same scheduler, please consider define a dedicate one.
+ *
+ * @return
+ */
+ ScheduledExecutorService nextScheduledExecutor();
+
+ /**
+ * Get the default shared threadpool.
+ *
+ * @return
+ */
+ ExecutorService getSharedExecutor();
+
+}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/Ring.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/Ring.java
new file mode 100644
index 0000000..eb9e50a
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/Ring.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common.threadpool.manager;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class Ring<T> {
+
+ AtomicInteger count = new AtomicInteger();
+
+ private List<T> itemList = new CopyOnWriteArrayList<T>();
+
+ public void addItem(T t) {
+ if (t != null) {
+ itemList.add(t);
+ }
+ }
+
+ public T pollItem() {
+ if (itemList.isEmpty()) {
+ return null;
+ }
+ if (itemList.size() == 1) {
+ return itemList.get(0);
+ }
+
+ if (count.intValue() > Integer.MAX_VALUE - 10000) {
+ count.set(count.get() % itemList.size());
+ }
+
+ int index = Math.abs(count.getAndIncrement()) % itemList.size();
+ return itemList.get(index);
+ }
+
+ public T peekItem() {
+ if (itemList.isEmpty()) {
+ return null;
+ }
+ if (itemList.size() == 1) {
+ return itemList.get(0);
+ }
+ int index = Math.abs(count.get()) % itemList.size();
+ return itemList.get(index);
+ }
+
+ public List<T> listItems() {
+ return Collections.unmodifiableList(itemList);
+ }
+}
diff --git a/dubbo-common/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.common.threadpool.manager.ExecutorRepository b/dubbo-common/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.common.threadpool.manager.ExecutorRepository
new file mode 100644
index 0000000..44199b0
--- /dev/null
+++ b/dubbo-common/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.common.threadpool.manager.ExecutorRepository
@@ -0,0 +1 @@
+default=org.apache.dubbo.common.threadpool.manager.DefaultExecutorRepository
\ No newline at end of file
diff --git a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/Application.java b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/Application.java
index f35f82a..25a9dd4 100644
--- a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/Application.java
+++ b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/Application.java
@@ -16,8 +16,6 @@
*/
package org.apache.dubbo.demo.provider;
-import org.apache.dubbo.common.utils.ReflectUtils;
-
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class Application {
@@ -27,7 +25,6 @@ public class Application {
*/
public static void main(String[] args) throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/dubbo-provider.xml");
- System.err.println(ReflectUtils.getDesc(new Class<?>[]{String.class, String[].class, Object[].class}));
context.start();
System.in.read();
}
diff --git a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/DemoServiceImpl.java b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/DemoServiceImpl.java
index 31b9040..5e57f77 100644
--- a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/DemoServiceImpl.java
+++ b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/DemoServiceImpl.java
@@ -30,6 +30,11 @@ public class DemoServiceImpl implements DemoService {
@Override
public String sayHello(String name) {
logger.info("Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress());
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
return "Hello " + name + ", response from provider: " + RpcContext.getContext().getLocalAddress();
}
diff --git a/dubbo-monitor/dubbo-monitor-default/src/main/java/org/apache/dubbo/monitor/dubbo/MetricsFilter.java b/dubbo-monitor/dubbo-monitor-default/src/main/java/org/apache/dubbo/monitor/dubbo/MetricsFilter.java
index 97b75d5..2f26743 100644
--- a/dubbo-monitor/dubbo-monitor-default/src/main/java/org/apache/dubbo/monitor/dubbo/MetricsFilter.java
+++ b/dubbo-monitor/dubbo-monitor-default/src/main/java/org/apache/dubbo/monitor/dubbo/MetricsFilter.java
@@ -57,6 +57,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_PROTOCOL;
+import static org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_SERVICE_COMPONENT_KEY;
import static org.apache.dubbo.monitor.Constants.DUBBO_CONSUMER;
import static org.apache.dubbo.monitor.Constants.DUBBO_CONSUMER_METHOD;
import static org.apache.dubbo.monitor.Constants.DUBBO_GROUP;
@@ -66,7 +67,6 @@ import static org.apache.dubbo.monitor.Constants.METHOD;
import static org.apache.dubbo.monitor.Constants.METRICS_PORT;
import static org.apache.dubbo.monitor.Constants.METRICS_PROTOCOL;
import static org.apache.dubbo.monitor.Constants.SERVICE;
-import static org.apache.dubbo.remoting.Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
public class MetricsFilter implements Filter {
diff --git a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockChannel.java b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockChannel.java
index 7a4961b..e563da6 100644
--- a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockChannel.java
+++ b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockChannel.java
@@ -24,6 +24,7 @@ import org.apache.dubbo.remoting.exchange.ExchangeHandler;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
public class MockChannel implements ExchangeChannel {
@@ -93,6 +94,16 @@ public class MockChannel implements ExchangeChannel {
return null;
}
+ @Override
+ public CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
+ return null;
+ }
+
public ExchangeHandler getExchangeHandler() {
return null;
}
diff --git a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockedClient.java b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockedClient.java
index 1afe0d5..ad06cd2 100644
--- a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockedClient.java
+++ b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockedClient.java
@@ -29,6 +29,7 @@ import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
/**
@@ -81,13 +82,23 @@ public class MockedClient implements ExchangeClient {
}
public CompletableFuture<Object> request(Object msg) throws RemotingException {
- return request(msg, 0);
+ return request(msg, null);
}
public CompletableFuture<Object> request(Object msg, int timeout) throws RemotingException {
+ return this.request(msg, timeout, null);
+ }
+
+ @Override
+ public CompletableFuture<Object> request(Object msg, ExecutorService executor) throws RemotingException {
+ return this.request(msg, 0, executor);
+ }
+
+ @Override
+ public CompletableFuture<Object> request(Object msg, int timeout, ExecutorService executor) throws RemotingException {
this.invoked = msg;
return new CompletableFuture<Object>() {
- public Object get() throws InterruptedException, ExecutionException {
+ public Object get() throws InterruptedException, ExecutionException {
return received;
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Constants.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Constants.java
index 8fd6361..45e5a67 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Constants.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Constants.java
@@ -18,8 +18,6 @@
package org.apache.dubbo.remoting;
-import java.util.concurrent.ExecutorService;
-
public interface Constants {
String BUFFER_KEY = "buffer";
@@ -117,8 +115,6 @@ public interface Constants {
String CHANNEL_SEND_READONLYEVENT_KEY = "channel.readonly.send";
- String EXECUTOR_SERVICE_COMPONENT_KEY = ExecutorService.class.getName();
-
String RECONNECT_KEY = "reconnect";
int DEFAULT_RECONNECT_PERIOD = 2000;
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java
index 0e4917d..c0cf131 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.RemotingException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
/**
* ExchangeChannel. (API/SPI, Prototype, ThreadSafe)
@@ -33,6 +34,7 @@ public interface ExchangeChannel extends Channel {
* @return response future
* @throws RemotingException
*/
+ @Deprecated
CompletableFuture<Object> request(Object request) throws RemotingException;
/**
@@ -43,9 +45,29 @@ public interface ExchangeChannel extends Channel {
* @return response future
* @throws RemotingException
*/
+ @Deprecated
CompletableFuture<Object> request(Object request, int timeout) throws RemotingException;
/**
+ * send request.
+ *
+ * @param request
+ * @return response future
+ * @throws RemotingException
+ */
+ CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException;
+
+ /**
+ * send request.
+ *
+ * @param request
+ * @param timeout
+ * @return response future
+ * @throws RemotingException
+ */
+ CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException;
+
+ /**
* get message handler.
*
* @return message handler
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
index 7f0e3a7..0abecf2 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.remoting.exchange.support;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
import org.apache.dubbo.common.timer.HashedWheelTimer;
import org.apache.dubbo.common.timer.Timeout;
import org.apache.dubbo.common.timer.Timer;
@@ -34,6 +35,7 @@ import java.util.Date;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
@@ -64,6 +66,16 @@ public class DefaultFuture extends CompletableFuture<Object> {
private volatile long sent;
private Timeout timeoutCheckTask;
+ private ExecutorService executor;
+
+ public ExecutorService getExecutor() {
+ return executor;
+ }
+
+ public void setExecutor(ExecutorService executor) {
+ this.executor = executor;
+ }
+
private DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
@@ -92,8 +104,9 @@ public class DefaultFuture extends CompletableFuture<Object> {
* @param timeout timeout
* @return a new DefaultFuture
*/
- public static DefaultFuture newFuture(Channel channel, Request request, int timeout) {
+ public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
final DefaultFuture future = new DefaultFuture(channel, request, timeout);
+ future.setExecutor(executor);
// timeout check
timeoutCheck(future);
return future;
@@ -178,7 +191,6 @@ public class DefaultFuture extends CompletableFuture<Object> {
this.cancel(true);
}
-
private void doReceived(Response res) {
if (res == null) {
throw new IllegalStateException("response cannot be null");
@@ -190,6 +202,15 @@ public class DefaultFuture extends CompletableFuture<Object> {
} else {
this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
}
+
+ // the result is returning, but the caller thread may still waiting
+ // to avoid endless waiting for whatever reason, notify caller thread to return.
+ if (executor != null && executor instanceof ThreadlessExecutor) {
+ ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
+ if (threadlessExecutor.isWaiting()) {
+ threadlessExecutor.notifyReturn();
+ }
+ }
}
private long getId() {
@@ -243,14 +264,17 @@ public class DefaultFuture extends CompletableFuture<Object> {
if (future == null || future.isDone()) {
return;
}
- // create exception response.
- Response timeoutResponse = new Response(future.getId());
- // set timeout status.
- timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
- timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
- // handle response.
- DefaultFuture.received(future.getChannel(), timeoutResponse, true);
-
+ if (future.getExecutor() != null) {
+ future.getExecutor().execute(() -> {
+ // create exception response.
+ Response timeoutResponse = new Response(future.getId());
+ // set timeout status.
+ timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
+ timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
+ // handle response.
+ DefaultFuture.received(future.getChannel(), timeoutResponse, true);
+ });
+ }
}
}
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java
index 2529ac7..f96207b 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java
@@ -31,6 +31,7 @@ import org.apache.dubbo.remoting.exchange.support.DefaultFuture;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
@@ -100,11 +101,21 @@ final class HeaderExchangeChannel implements ExchangeChannel {
@Override
public CompletableFuture<Object> request(Object request) throws RemotingException {
- return request(request, channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT));
+ return request(request, null);
}
@Override
public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
+ return request(request, timeout, null);
+ }
+
+ @Override
+ public CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException {
+ return request(request, channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT), executor);
+ }
+
+ @Override
+ public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
@@ -113,7 +124,7 @@ final class HeaderExchangeChannel implements ExchangeChannel {
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
- DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
+ DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
try {
channel.send(req);
} catch (RemotingException e) {
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
index 5a010fa..6671d1a 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
@@ -31,6 +31,7 @@ import org.apache.dubbo.remoting.exchange.ExchangeHandler;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static org.apache.dubbo.remoting.utils.UrlUtils.getHeartbeat;
@@ -85,6 +86,16 @@ public class HeaderExchangeClient implements ExchangeClient {
}
@Override
+ public CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException {
+ return channel.request(request, executor);
+ }
+
+ @Override
+ public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
+ return channel.request(request, timeout, executor);
+ }
+
+ @Override
public ChannelHandler getChannelHandler() {
return channel.getChannelHandler();
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
index e005ad5..0241c29 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
@@ -21,7 +21,7 @@ import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.store.DataStore;
+import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Channel;
@@ -36,7 +36,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SIDE;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_CLIENT_THREADPOOL;
import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY;
@@ -50,6 +49,7 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
private final Lock connectLock = new ReentrantLock();
private final boolean needReconnect;
protected volatile ExecutorService executor;
+ private ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
@@ -84,11 +84,7 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
-
- executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
- .getDefaultExtension().get(CONSUMER_SIDE, Integer.toString(url.getPort()));
- ExtensionLoader.getExtensionLoader(DataStore.class)
- .getDefaultExtension().remove(CONSUMER_SIDE, Integer.toString(url.getPort()));
+ executor = executorRepository.createExecutorIfAbsent(url);
}
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
@@ -196,7 +192,7 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
} else {
if (logger.isInfoEnabled()) {
- logger.info("Succeed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", channel is " + this.getChannel());
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
index d1cbdbb..da5f7e3 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
@@ -20,7 +20,7 @@ import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.store.DataStore;
+import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Channel;
@@ -32,15 +32,13 @@ import org.apache.dubbo.remoting.Server;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_VALUE;
-import static org.apache.dubbo.common.constants.CommonConstants.THREADS_KEY;
-import static org.apache.dubbo.remoting.Constants.IDLE_TIMEOUT_KEY;
-import static org.apache.dubbo.remoting.Constants.DEFAULT_IDLE_TIMEOUT;
import static org.apache.dubbo.remoting.Constants.ACCEPTS_KEY;
import static org.apache.dubbo.remoting.Constants.DEFAULT_ACCEPTS;
+import static org.apache.dubbo.remoting.Constants.DEFAULT_IDLE_TIMEOUT;
+import static org.apache.dubbo.remoting.Constants.IDLE_TIMEOUT_KEY;
/**
* AbstractServer
@@ -55,6 +53,8 @@ public abstract class AbstractServer extends AbstractEndpoint implements Server
private int accepts;
private int idleTimeout;
+ private ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
+
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
@@ -76,9 +76,7 @@ public abstract class AbstractServer extends AbstractEndpoint implements Server
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
- //fixme replace this with better method
- DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
- executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
+ executor = executorRepository.createExecutorIfAbsent(url);
}
protected abstract void doOpen() throws Throwable;
@@ -110,30 +108,7 @@ public abstract class AbstractServer extends AbstractEndpoint implements Server
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
- try {
- if (url.hasParameter(THREADS_KEY)
- && executor instanceof ThreadPoolExecutor && !executor.isShutdown()) {
- ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
- int threads = url.getParameter(THREADS_KEY, 0);
- int max = threadPoolExecutor.getMaximumPoolSize();
- int core = threadPoolExecutor.getCorePoolSize();
- if (threads > 0 && (threads != max || threads != core)) {
- if (threads < core) {
- threadPoolExecutor.setCorePoolSize(threads);
- if (core == max) {
- threadPoolExecutor.setMaximumPoolSize(threads);
- }
- } else {
- threadPoolExecutor.setMaximumPoolSize(threads);
- if (core == max) {
- threadPoolExecutor.setCorePoolSize(threads);
- }
- }
- }
- }
- } catch (Throwable t) {
- logger.error(t.getMessage(), t);
- }
+ executorRepository.updateThreadpool(url, executor);
super.setUrl(getUrl().addParameters(url.getParameters()));
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java
index 4fe83ec..a709a45 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java
@@ -20,29 +20,21 @@ import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.store.DataStore;
-import org.apache.dubbo.common.threadpool.ThreadPool;
-import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
-import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.exchange.Request;
+import org.apache.dubbo.remoting.exchange.Response;
+import org.apache.dubbo.remoting.exchange.support.DefaultFuture;
import org.apache.dubbo.remoting.transport.ChannelHandlerDelegate;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SIDE;
-import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY;
public class WrappedChannelHandler implements ChannelHandlerDelegate {
protected static final Logger logger = LoggerFactory.getLogger(WrappedChannelHandler.class);
- protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
-
- protected final ExecutorService executor;
-
protected final ChannelHandler handler;
protected final URL url;
@@ -50,24 +42,10 @@ public class WrappedChannelHandler implements ChannelHandlerDelegate {
public WrappedChannelHandler(ChannelHandler handler, URL url) {
this.handler = handler;
this.url = url;
- executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
-
- String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
- if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
- componentKey = CONSUMER_SIDE;
- }
- DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
- dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}
public void close() {
- try {
- if (executor != null) {
- executor.shutdown();
- }
- } catch (Throwable t) {
- logger.warn("fail to destroy thread pool of server: " + t.getMessage(), t);
- }
+
}
@Override
@@ -95,8 +73,16 @@ public class WrappedChannelHandler implements ChannelHandlerDelegate {
handler.caught(channel, exception);
}
- public ExecutorService getExecutor() {
- return executor;
+ protected void sendFeedback(Channel channel, Request request, Throwable t) throws RemotingException {
+ if (request.isTwoWay()) {
+ String msg = "Server side(" + url.getIp() + "," + url.getPort()
+ + ") thread pool is exhausted, detail msg:" + t.getMessage();
+ Response response = new Response(request.getId(), request.getVersion());
+ response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
+ response.setErrorMessage(msg);
+ channel.send(response);
+ return;
+ }
}
@Override
@@ -112,12 +98,46 @@ public class WrappedChannelHandler implements ChannelHandlerDelegate {
return url;
}
- public ExecutorService getExecutorService() {
- ExecutorService cexecutor = executor;
- if (cexecutor == null || cexecutor.isShutdown()) {
- cexecutor = SHARED_EXECUTOR;
+ /**
+ * Currently, this method is mainly customized to facilitate the thread model on consumer side.
+ * 1. Use ThreadlessExecutor, aka., delegate callback directly to the thread initiating the call.
+ * 2. Use shared executor to execute the callback.
+ *
+ * @param msg
+ * @return
+ */
+ public ExecutorService getPreferredExecutorService(Object msg) {
+ if (msg instanceof Response) {
+ Response response = (Response) msg;
+ DefaultFuture responseFuture = DefaultFuture.getFuture(response.getId());
+ // a typical scenario is the response returned after timeout, the timeout response may has completed the future
+ if (responseFuture == null) {
+ return getSharedExecutorService();
+ } else {
+ ExecutorService executor = responseFuture.getExecutor();
+ if (executor == null || executor.isShutdown()) {
+ executor = getSharedExecutorService();
+ }
+ return executor;
+ }
+ } else {
+ return getSharedExecutorService();
}
- return cexecutor;
}
+ /**
+ * get the shared executor for current Server or Client
+ *
+ * @return
+ */
+ public ExecutorService getSharedExecutorService() {
+ return ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().createExecutorIfAbsent(url);
+ }
+
+ @Deprecated
+ public ExecutorService getExecutorService() {
+ return getSharedExecutorService();
+ }
+
+
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/all/AllChannelHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/all/AllChannelHandler.java
index 3ea9d40..25158d0 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/all/AllChannelHandler.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/all/AllChannelHandler.java
@@ -22,7 +22,6 @@ import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.ExecutionException;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.exchange.Request;
-import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable;
import org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable.ChannelState;
import org.apache.dubbo.remoting.transport.dispatcher.WrappedChannelHandler;
@@ -58,22 +57,13 @@ public class AllChannelHandler extends WrappedChannelHandler {
@Override
public void received(Channel channel, Object message) throws RemotingException {
- ExecutorService executor = getExecutorService();
+ ExecutorService executor = getPreferredExecutorService(message);
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
- //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
- //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
if(message instanceof Request && t instanceof RejectedExecutionException){
- Request request = (Request)message;
- if(request.isTwoWay()){
- String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
- Response response = new Response(request.getId(), request.getVersion());
- response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
- response.setErrorMessage(msg);
- channel.send(response);
- return;
- }
+ sendFeedback(channel, (Request) message, t);
+ return;
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/connection/ConnectionOrderedChannelHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/connection/ConnectionOrderedChannelHandler.java
index 33bc279..2132cac 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/connection/ConnectionOrderedChannelHandler.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/connection/ConnectionOrderedChannelHandler.java
@@ -24,7 +24,6 @@ import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.ExecutionException;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.exchange.Request;
-import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable;
import org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable.ChannelState;
import org.apache.dubbo.remoting.transport.dispatcher.WrappedChannelHandler;
@@ -80,21 +79,13 @@ public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {
@Override
public void received(Channel channel, Object message) throws RemotingException {
- ExecutorService executor = getExecutorService();
+ ExecutorService executor = getPreferredExecutorService(message);
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
- //fix, reject exception can not be sent to consumer because thread pool is full, resulting in consumers waiting till timeout.
if (message instanceof Request && t instanceof RejectedExecutionException) {
- Request request = (Request) message;
- if (request.isTwoWay()) {
- String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
- Response response = new Response(request.getId(), request.getVersion());
- response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
- response.setErrorMessage(msg);
- channel.send(response);
- return;
- }
+ sendFeedback(channel, (Request) message, t);
+ return;
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/message/MessageOnlyChannelHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/direct/DirectChannelHandler.java
similarity index 66%
copy from dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/message/MessageOnlyChannelHandler.java
copy to dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/direct/DirectChannelHandler.java
index 2f51860..9bb8d1f 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/message/MessageOnlyChannelHandler.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/direct/DirectChannelHandler.java
@@ -14,9 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.remoting.transport.dispatcher.message;
+package org.apache.dubbo.remoting.transport.dispatcher.direct;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.ExecutionException;
@@ -27,19 +28,23 @@ import org.apache.dubbo.remoting.transport.dispatcher.WrappedChannelHandler;
import java.util.concurrent.ExecutorService;
-public class MessageOnlyChannelHandler extends WrappedChannelHandler {
+public class DirectChannelHandler extends WrappedChannelHandler {
- public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
+ public DirectChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
- ExecutorService executor = getExecutorService();
- try {
- executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
- } catch (Throwable t) {
- throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
+ ExecutorService executor = getPreferredExecutorService(message);
+ if (executor instanceof ThreadlessExecutor) {
+ try {
+ executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
+ } catch (Throwable t) {
+ throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
+ }
+ } else {
+ handler.received(channel, message);
}
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/direct/DirectDispatcher.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/direct/DirectDispatcher.java
index f18065d..aaed4e7 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/direct/DirectDispatcher.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/direct/DirectDispatcher.java
@@ -29,7 +29,7 @@ public class DirectDispatcher implements Dispatcher {
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
- return handler;
+ return new DirectChannelHandler(handler, url);
}
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionChannelHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionChannelHandler.java
index e39d138..761e26c 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionChannelHandler.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionChannelHandler.java
@@ -17,12 +17,12 @@
package org.apache.dubbo.remoting.transport.dispatcher.execution;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.ExecutionException;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.exchange.Request;
-import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable;
import org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable.ChannelState;
import org.apache.dubbo.remoting.transport.dispatcher.WrappedChannelHandler;
@@ -42,7 +42,8 @@ public class ExecutionChannelHandler extends WrappedChannelHandler {
@Override
public void received(Channel channel, Object message) throws RemotingException {
- ExecutorService executor = getExecutorService();
+ ExecutorService executor = getPreferredExecutorService(message);
+
if (message instanceof Request) {
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
@@ -51,19 +52,12 @@ public class ExecutionChannelHandler extends WrappedChannelHandler {
// therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent
// this scenario from happening, but a better solution should be considered later.
if (t instanceof RejectedExecutionException) {
- Request request = (Request) message;
- if (request.isTwoWay()) {
- String msg = "Server side(" + url.getIp() + "," + url.getPort()
- + ") thread pool is exhausted, detail msg:" + t.getMessage();
- Response response = new Response(request.getId(), request.getVersion());
- response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
- response.setErrorMessage(msg);
- channel.send(response);
- return;
- }
+ sendFeedback(channel, (Request) message, t);
}
throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);
}
+ } else if (executor instanceof ThreadlessExecutor) {
+ executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} else {
handler.received(channel, message);
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/message/MessageOnlyChannelHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/message/MessageOnlyChannelHandler.java
index 2f51860..2cd20cc 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/message/MessageOnlyChannelHandler.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/message/MessageOnlyChannelHandler.java
@@ -35,7 +35,7 @@ public class MessageOnlyChannelHandler extends WrappedChannelHandler {
@Override
public void received(Channel channel, Object message) throws RemotingException {
- ExecutorService executor = getExecutorService();
+ ExecutorService executor = getPreferredExecutorService(message);
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
index 2dd4005..0f19d15 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
@@ -95,7 +95,7 @@ public class DefaultFutureTest {
// timeout after 5 seconds.
Channel channel = new MockedChannel();
Request request = new Request(10);
- DefaultFuture f = DefaultFuture.newFuture(channel, request, 5000);
+ DefaultFuture f = DefaultFuture.newFuture(channel, request, 5000, null);
//mark the future is sent
DefaultFuture.sent(channel, request);
while (!f.isDone()) {
@@ -119,7 +119,7 @@ public class DefaultFutureTest {
private DefaultFuture defaultFuture(int timeout) {
Channel channel = new MockedChannel();
Request request = new Request(index.getAndIncrement());
- return DefaultFuture.newFuture(channel, request, timeout);
+ return DefaultFuture.newFuture(channel, request, timeout, null);
}
}
\ No newline at end of file
diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannelTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannelTest.java
index 92739bf..2affe7d 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannelTest.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannelTest.java
@@ -17,23 +17,21 @@
package org.apache.dubbo.remoting.exchange.support.header;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.remoting.exchange.support.DefaultFuture;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-
-import org.apache.dubbo.remoting.Channel;
-import org.apache.dubbo.remoting.RemotingException;
-import org.apache.dubbo.remoting.exchange.Request;
-
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import java.util.List;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class HeaderExchangeChannelTest {
@@ -193,7 +191,7 @@ public class HeaderExchangeChannelTest {
public void closeWithTimeoutTest02() {
Assertions.assertFalse(channel.isClosed());
Request request = new Request();
- DefaultFuture.newFuture(channel, request, 100);
+ DefaultFuture.newFuture(channel, request, 100, null);
header.close(100);
//return directly
header.close(1000);
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
index 815bec5..978bf7b 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
@@ -18,10 +18,14 @@ package org.apache.dubbo.rpc;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.Function;
/**
@@ -47,6 +51,7 @@ public class AsyncRpcResult extends AbstractResult {
*/
private RpcContext storedContext;
private RpcContext storedServerContext;
+ private Executor executor;
private Invocation invocation;
@@ -99,7 +104,7 @@ public class AsyncRpcResult extends AbstractResult {
public Result getAppResponse() {
try {
if (this.isDone()) {
- return this.get();
+ return super.get();
}
} catch (Exception e) {
// This should never happen;
@@ -108,32 +113,36 @@ public class AsyncRpcResult extends AbstractResult {
return new AppResponse();
}
+ /**
+ * This method will always return after a maximum 'timeout' waiting:
+ * 1. if value returns before timeout, return normally.
+ * 2. if no value returns after timeout, throw TimeoutException.
+ *
+ * @return
+ * @throws InterruptedException
+ * @throws ExecutionException
+ */
+ @Override
+ public Result get() throws InterruptedException, ExecutionException {
+ if (executor != null) {
+ ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
+ threadlessExecutor.waitAndDrain();
+ }
+ return super.get();
+ }
+
+ @Override
+ public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return this.get();
+ }
+
@Override
public Object recreate() throws Throwable {
RpcInvocation rpcInvocation = (RpcInvocation) invocation;
if (InvokeMode.FUTURE == rpcInvocation.getInvokeMode()) {
- AppResponse appResponse = new AppResponse();
- CompletableFuture<Object> future = new CompletableFuture<>();
- appResponse.setValue(future);
- this.whenComplete((result, t) -> {
- if (t != null) {
- if (t instanceof CompletionException) {
- t = t.getCause();
- }
- future.completeExceptionally(t);
- } else {
- if (result.hasException()) {
- future.completeExceptionally(result.getException());
- } else {
- future.complete(result.getValue());
- }
- }
- });
- return appResponse.recreate();
- } else if (this.isDone()) {
- return this.get().recreate();
+ return RpcContext.getContext().getFuture();
}
- return (new AppResponse()).recreate();
+ return getAppResponse().recreate();
}
@Override
@@ -196,6 +205,14 @@ public class AsyncRpcResult extends AbstractResult {
return invocation;
}
+ public Executor getExecutor() {
+ return executor;
+ }
+
+ public void setExecutor(Executor executor) {
+ this.executor = executor;
+ }
+
/**
* tmp context to use when the thread switch to Dubbo thread.
*/
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/FutureAdapter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/FutureAdapter.java
similarity index 86%
rename from dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/FutureAdapter.java
rename to dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/FutureAdapter.java
index 03954d1..d50f91e 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/FutureAdapter.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/FutureAdapter.java
@@ -14,11 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.rpc.protocol.dubbo;
-
-import org.apache.dubbo.rpc.AppResponse;
-import org.apache.dubbo.rpc.RpcException;
+package org.apache.dubbo.rpc;
+import java.lang.ref.SoftReference;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
@@ -32,7 +30,10 @@ public class FutureAdapter<V> extends CompletableFuture<V> {
private CompletableFuture<AppResponse> appResponseFuture;
- public FutureAdapter(CompletableFuture<AppResponse> future) {
+ private SoftReference<Invocation> invocationSoftReference;
+
+ public FutureAdapter(CompletableFuture<AppResponse> future, Invocation invocation) {
+ this.invocationSoftReference = new SoftReference<>(invocation);
this.appResponseFuture = future;
future.whenComplete((appResponse, t) -> {
if (t != null) {
@@ -53,6 +54,10 @@ public class FutureAdapter<V> extends CompletableFuture<V> {
// TODO figure out the meaning of cancel in DefaultFuture.
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
+// Invocation invocation = invocationSoftReference.get();
+// if (invocation != null) {
+// invocation.getInvoker().invoke(cancel);
+// }
return appResponseFuture.cancel(mayInterruptIfRunning);
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
index 58cb410..be32bb5 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
@@ -18,13 +18,18 @@ package org.apache.dubbo.rpc.protocol;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.Version;
+import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
+import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.utils.ArrayUtils;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.rpc.AsyncRpcResult;
+import org.apache.dubbo.rpc.FutureAdapter;
import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.InvokeMode;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
@@ -36,6 +41,7 @@ import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -151,26 +157,38 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
+ AsyncRpcResult asyncResult;
try {
- return doInvoke(invocation);
+ asyncResult = (AsyncRpcResult) doInvoke(invocation);
} catch (InvocationTargetException e) { // biz exception
Throwable te = e.getTargetException();
if (te == null) {
- return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
+ asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
} else {
if (te instanceof RpcException) {
((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
}
- return AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
+ asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
}
} catch (RpcException e) {
if (e.isBiz()) {
- return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
+ asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
} else {
throw e;
}
} catch (Throwable e) {
- return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
+ asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
+ }
+ RpcContext.getContext().setFuture(new FutureAdapter(asyncResult, inv));
+ return asyncResult;
+ }
+
+ protected ExecutorService getCallbackExecutor(URL url, Invocation inv) {
+ ExecutorService sharedExecutor = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getExecutor(url);
+ if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) {
+ return new ThreadlessExecutor(sharedExecutor);
+ } else {
+ return sharedExecutor;
}
}
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java
index 1a2347c..2a190ad 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java
@@ -74,6 +74,7 @@ public class DecodeableRpcResult extends AppResponse implements Codec, Decodeabl
@Override
public Object decode(Channel channel, InputStream input) throws IOException {
+ log.debug("Decoding in thread -- " + Thread.currentThread().getName());
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input);
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
index ece7149..4cf6259 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
@@ -35,6 +35,7 @@ import org.apache.dubbo.rpc.support.RpcUtils;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
@@ -95,9 +96,10 @@ public class DubboInvoker<T> extends AbstractInvoker<T> {
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
- CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
+ ExecutorService executor = getCallbackExecutor(getUrl(), inv);
+ asyncRpcResult.setExecutor(executor);
+ CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout, executor);
asyncRpcResult.subscribeTo(responseFuture);
- RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
return asyncRpcResult;
}
} catch (TimeoutException e) {
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
index a495775..77eae2c 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
@@ -29,6 +29,7 @@ import org.apache.dubbo.remoting.exchange.Exchangers;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -114,6 +115,20 @@ final class LazyConnectExchangeClient implements ExchangeClient {
return client.request(request, timeout);
}
+ @Override
+ public CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException {
+ warning();
+ initClient();
+ return client.request(request, executor);
+ }
+
+ @Override
+ public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
+ warning();
+ initClient();
+ return client.request(request, timeout, executor);
+ }
+
/**
* If {@link #REQUEST_WITH_WARNING_KEY} is configured, then warn once every 5000 invocations.
*/
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
index c7074aa..ed3c61b 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
@@ -27,6 +27,7 @@ import org.apache.dubbo.remoting.exchange.ExchangeHandler;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.dubbo.remoting.Constants.RECONNECT_KEY;
@@ -81,6 +82,16 @@ final class ReferenceCountExchangeClient implements ExchangeClient {
}
@Override
+ public CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException {
+ return client.request(request, executor);
+ }
+
+ @Override
+ public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
+ return client.request(request, timeout, executor);
+ }
+
+ @Override
public boolean isConnected() {
return client.isConnected();
}
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/status/ThreadPoolStatusChecker.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/status/ThreadPoolStatusChecker.java
index 8506f1b..ea623c8 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/status/ThreadPoolStatusChecker.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/status/ThreadPoolStatusChecker.java
@@ -16,12 +16,12 @@
*/
package org.apache.dubbo.rpc.protocol.dubbo.status;
+import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.status.Status;
import org.apache.dubbo.common.status.StatusChecker;
import org.apache.dubbo.common.store.DataStore;
-import org.apache.dubbo.remoting.Constants;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@@ -36,7 +36,7 @@ public class ThreadPoolStatusChecker implements StatusChecker {
@Override
public Status check() {
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
- Map<String, Object> executors = dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY);
+ Map<String, Object> executors = dataStore.get(CommonConstants.EXECUTOR_SERVICE_COMPONENT_KEY);
StringBuilder msg = new StringBuilder();
Status.Level level = Status.Level.OK;
diff --git a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java
index 3b0e3d9..3d0aee9 100644
--- a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java
@@ -26,11 +26,9 @@ import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
-import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.protocol.AbstractInvoker;
-import org.apache.dubbo.rpc.protocol.dubbo.FutureAdapter;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -97,7 +95,6 @@ public class ThriftInvoker<T> extends AbstractInvoker<T> {
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
asyncRpcResult.subscribeTo(responseFuture);
- RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
return asyncRpcResult;
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, e.getMessage(), e);