You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/10/14 01:03:54 UTC

[pulsar] branch master updated: Use ThreadPoolExecutor instead of EventLoop (#8208)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a298f3  Use ThreadPoolExecutor instead of EventLoop (#8208)
3a298f3 is described below

commit 3a298f3404d597e6a94de981c5fbe570264dcba1
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Wed Oct 14 09:03:39 2020 +0800

    Use ThreadPoolExecutor instead of EventLoop (#8208)
    
    ### Motivation
    When Netty's `EventLoop` receives a new task,it will call `eventFdWrite`, and then trigger system calls, such as: system_call_fastpath, eventfd_write
    After we replaced EventLoop with a native jdk thread pool, the performance improved
    
    ### Modifications
    Use ThreadPoolExecutor instead of EventLoop
    
    ### Verifying this change
    We use pulsar perf for testing
    before:
    Aggregated throughput stats --- 11715556 records received --- 68813.420 msg/s --- 537.605 Mbit/s
    
    after:
    Aggregated throughput stats --- 18392800 records received --- 133314.602 msg/s --- 1041.520 Mbit/s
---
 .../org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java   | 6 ++----
 .../java/org/apache/pulsar/client/impl/PulsarClientImpl.java     | 9 ++++++++-
 2 files changed, 10 insertions(+), 5 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index a1756a8..41bb237 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -227,9 +227,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             } else {
                 // Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid
                 // recursion and stack overflow
-                client.eventLoopGroup().execute(() -> {
-                    receiveMessageFromConsumer(consumer);
-                });
+                client.getInternalExecutorService().execute(() -> receiveMessageFromConsumer(consumer));
             }
         });
     }
@@ -298,7 +296,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                     break;
                 }
 
-                client.eventLoopGroup().execute(() -> {
+                client.getInternalExecutorService().execute(() -> {
                     receiveMessageFromConsumer(consumer);
                 });
             }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index cbdcb27..64f1ef9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -41,8 +41,10 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -65,7 +67,6 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.transaction.TransactionBuilder;
-import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
@@ -96,6 +97,7 @@ public class PulsarClientImpl implements PulsarClient {
     private final ConnectionPool cnxPool;
     private final Timer timer;
     private final ExecutorProvider externalExecutorProvider;
+    private final ExecutorProvider internalExecutorService;
 
     public enum State {
         Open, Closing, Closed
@@ -145,6 +147,7 @@ public class PulsarClientImpl implements PulsarClient {
         conf.getAuthentication().start();
         this.cnxPool = cnxPool;
         externalExecutorProvider = new ExecutorProvider(conf.getNumListenerThreads(), getThreadFactory("pulsar-external-listener"));
+        internalExecutorService = new ExecutorProvider(conf.getNumIoThreads(), getThreadFactory("pulsar-client-internal"));
         if (conf.getServiceUrl().startsWith("http")) {
             lookup = new HttpLookupService(conf, eventLoopGroup);
         } else {
@@ -591,6 +594,7 @@ public class PulsarClientImpl implements PulsarClient {
             cnxPool.close();
             timer.stop();
             externalExecutorProvider.shutdownNow();
+            internalExecutorService.shutdownNow();
             conf.getAuthentication().close();
         } catch (Throwable t) {
             log.warn("Failed to shutdown Pulsar client", t);
@@ -815,6 +819,9 @@ public class PulsarClientImpl implements PulsarClient {
         return CompletableFuture.completedFuture(schema);
     }
 
+    public ExecutorService getInternalExecutorService() {
+        return internalExecutorService.getExecutor();
+    }
     //
     // Transaction related API
     //