You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/10/14 01:03:54 UTC
[pulsar] branch master updated: Use ThreadPoolExecutor instead of
EventLoop (#8208)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3a298f3 Use ThreadPoolExecutor instead of EventLoop (#8208)
3a298f3 is described below
commit 3a298f3404d597e6a94de981c5fbe570264dcba1
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Wed Oct 14 09:03:39 2020 +0800
Use ThreadPoolExecutor instead of EventLoop (#8208)
### Motivation
When Netty's `EventLoop` receives a new task,it will call `eventFdWrite`, and then trigger system calls, such as: system_call_fastpath, eventfd_write
After we replaced EventLoop with a native jdk thread pool, the performance improved
### Modifications
Use ThreadPoolExecutor instead of EventLoop
### Verifying this change
We use pulsar perf for testing
before:
Aggregated throughput stats --- 11715556 records received --- 68813.420 msg/s --- 537.605 Mbit/s
after:
Aggregated throughput stats --- 18392800 records received --- 133314.602 msg/s --- 1041.520 Mbit/s
---
.../org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 6 ++----
.../java/org/apache/pulsar/client/impl/PulsarClientImpl.java | 9 ++++++++-
2 files changed, 10 insertions(+), 5 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index a1756a8..41bb237 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -227,9 +227,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
} else {
// Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid
// recursion and stack overflow
- client.eventLoopGroup().execute(() -> {
- receiveMessageFromConsumer(consumer);
- });
+ client.getInternalExecutorService().execute(() -> receiveMessageFromConsumer(consumer));
}
});
}
@@ -298,7 +296,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
break;
}
- client.eventLoopGroup().execute(() -> {
+ client.getInternalExecutorService().execute(() -> {
receiveMessageFromConsumer(consumer);
});
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index cbdcb27..64f1ef9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -41,8 +41,10 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -65,7 +67,6 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.transaction.TransactionBuilder;
-import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
@@ -96,6 +97,7 @@ public class PulsarClientImpl implements PulsarClient {
private final ConnectionPool cnxPool;
private final Timer timer;
private final ExecutorProvider externalExecutorProvider;
+ private final ExecutorProvider internalExecutorService;
public enum State {
Open, Closing, Closed
@@ -145,6 +147,7 @@ public class PulsarClientImpl implements PulsarClient {
conf.getAuthentication().start();
this.cnxPool = cnxPool;
externalExecutorProvider = new ExecutorProvider(conf.getNumListenerThreads(), getThreadFactory("pulsar-external-listener"));
+ internalExecutorService = new ExecutorProvider(conf.getNumIoThreads(), getThreadFactory("pulsar-client-internal"));
if (conf.getServiceUrl().startsWith("http")) {
lookup = new HttpLookupService(conf, eventLoopGroup);
} else {
@@ -591,6 +594,7 @@ public class PulsarClientImpl implements PulsarClient {
cnxPool.close();
timer.stop();
externalExecutorProvider.shutdownNow();
+ internalExecutorService.shutdownNow();
conf.getAuthentication().close();
} catch (Throwable t) {
log.warn("Failed to shutdown Pulsar client", t);
@@ -815,6 +819,9 @@ public class PulsarClientImpl implements PulsarClient {
return CompletableFuture.completedFuture(schema);
}
+ public ExecutorService getInternalExecutorService() {
+ return internalExecutorService.getExecutor();
+ }
//
// Transaction related API
//