You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/28 15:13:31 UTC

[pulsar] 26/29: [improve][java-client] Replace ScheduledExecutor to improve performance of message consumption (#16236)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 63f5289865a04114c93ab3174d7f77c54ecbd340
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Jun 28 11:29:32 2022 +0800

    [improve][java-client] Replace ScheduledExecutor to improve performance of message consumption (#16236)
    
    ### Motivation
    
    The Scheduled Executor doesn't work very efficiently because each task will add to a DelayedQueue(A priority queue) first even if using the `.execute()` method without any schedule delay.
    
    <img width="1845" alt="image" src="https://user-images.githubusercontent.com/12592133/175871343-ecda138f-43a2-472e-ac42-8efdefb58810.png">
    
    <img width="1848" alt="image" src="https://user-images.githubusercontent.com/12592133/175871415-3d8d9fbd-f140-4a4b-a78d-306c1ec9673c.png">
    
    Profile result:
    [perf_consumer_0.html.txt](https://github.com/apache/pulsar/files/8989093/perf_consumer_0.html.txt)
    
    Running a performance test for single topic max message read rate test:
    
    ```
    bin/pulsar-perf consume test -q 1000000 -p 100000000
    bin/pulsar-perf produce test -r 1000000 -s 1 -mk random -o 10000 -threads 2
    ```
    
    Without this PR (2.10.1):
    
    ```
    Profiling started
    2022-06-27T13:44:01,183+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 23919664 msg --- 265702.851  msg/s --- 2.027 Mbit/s  --- Latency: mean: 49430.572 ms - med: 49406 - 95pct: 52853 - 99pct: 53024 - 99.9pct: 53053 - 99.99pct: 53056 - Max: 53057
    2022-06-27T13:44:11,196+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 26690802 msg --- 276759.125  msg/s --- 2.112 Mbit/s  --- Latency: mean: 56106.186 ms - med: 56000 - 95pct: 59289 - 99pct: 59985 - 99.9pct: 60037 - 99.99pct: 60042 - Max: 60042
    2022-06-27T13:44:21,216+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 28788693 msg --- 209467.861  msg/s --- 1.598 Mbit/s  --- Latency: mean: 63523.143 ms - med: 63580 - 95pct: 67202 - 99pct: 67523 - 99.9pct: 67547 - 99.99pct: 67548 - Max: 67548
    2022-06-27T13:44:31,233+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 31255365 msg --- 246190.932  msg/s --- 1.878 Mbit/s  --- Latency: mean: 71152.370 ms - med: 71058 - 95pct: 74555 - 99pct: 74806 - 99.9pct: 74842 - 99.99pct: 74847 - Max: 74847
    2022-06-27T13:44:41,247+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 33606630 msg --- 234769.313  msg/s --- 1.791 Mbit/s  --- Latency: mean: 78636.478 ms - med: 78724 - 95pct: 81694 - 99pct: 82090 - 99.9pct: 82279 - 99.99pct: 82285 - Max: 82286
    ```
    
    With this PR:
    
    ```
    Profiling started
    2022-06-27T13:56:20,426+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 431272207 msg --- 1079360.516  msg/s --- 8.235 Mbit/s  --- Latency: mean: 272.645 ms - med: 334 - 95pct: 470 - 99pct: 510 - 99.9pct: 522 - 99.99pct: 523 - Max: 524
    2022-06-27T13:56:30,438+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 441292346 msg --- 1000645.852  msg/s --- 7.634 Mbit/s  --- Latency: mean: 15.512 ms - med: 14 - 95pct: 29 - 99pct: 39 - 99.9pct: 54 - 99.99pct: 55 - Max: 55
    2022-06-27T13:56:40,450+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 451303308 msg --- 999973.040  msg/s --- 7.629 Mbit/s  --- Latency: mean: 18.265 ms - med: 14 - 95pct: 53 - 99pct: 98 - 99.9pct: 174 - 99.99pct: 176 - Max: 177
    2022-06-27T13:56:50,462+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 461308082 msg --- 999309.458  msg/s --- 7.624 Mbit/s  --- Latency: mean: 14.728 ms - med: 14 - 95pct: 18 - 99pct: 41 - 99.9pct: 50 - 99.99pct: 51 - Max: 52
    2022-06-27T13:57:00,475+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 471327606 msg --- 1000738.584  msg/s --- 7.635 Mbit/s  --- Latency: mean: 21.291 ms - med: 16 - 95pct: 52 - 99pct: 61 - 99.9pct: 65 - 99.99pct: 66 - Max: 66
    ```
    
    Profile result with this PR:
    
    [perf_consumer_1.html.txt](https://github.com/apache/pulsar/files/8989095/perf_consumer_1.html.txt)
    
    ### Modification
    
    - Change internal executor and external executor to normal executor service
    - Added a new ScheduledExecutorProvider to handle the scheduled tasks.
    
    (cherry picked from commit 96237a9615fefa2bed247b416bf1a12d8bc4b201)
---
 .../transaction/pendingack/PendingAckStore.java    |  4 +--
 .../pendingack/impl/InMemoryPendingAckStore.java   |  4 +--
 .../pendingack/impl/MLPendingAckStore.java         |  4 +--
 .../pendingack/impl/PendingAckHandleImpl.java      |  4 +--
 .../persistent/PersistentSubscriptionTest.java     |  4 +--
 .../pulsar/client/api/MultiTopicsConsumerTest.java |  2 +-
 .../apache/pulsar/client/impl/ConsumerBase.java    |  9 +++---
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 13 +++++---
 .../client/impl/MultiTopicsConsumerImpl.java       |  4 ++-
 .../pulsar/client/impl/PulsarClientImpl.java       | 12 +++++++-
 .../pulsar/client/util/ExecutorProvider.java       | 10 +++---
 .../client/util/ScheduledExecutorProvider.java     | 36 ++++++++++++++++++++++
 12 files changed, 78 insertions(+), 28 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java
index 3da676eb827..2f85d2430db 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.broker.transaction.pendingack;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.tuple.MutablePair;
@@ -38,7 +38,7 @@ public interface PendingAckStore {
      * @param pendingAckHandle the handle of pending ack
      * @param executorService the replay executor service
      */
-    void replayAsync(PendingAckHandleImpl pendingAckHandle, ScheduledExecutorService executorService);
+    void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService executorService);
 
     /**
      * Close the transaction pending ack store.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java
index d882c80c478..44c9fbe039b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.broker.transaction.pendingack.impl;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
@@ -33,7 +33,7 @@ import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 public class InMemoryPendingAckStore implements PendingAckStore {
 
     @Override
-    public void replayAsync(PendingAckHandleImpl pendingAckHandle, ScheduledExecutorService scheduledExecutorService) {
+    public void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService scheduledExecutorService) {
         pendingAckHandle.changeToReadyState();
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
index 4ac0cd9b82c..8b115543561 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
@@ -26,7 +26,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
@@ -110,7 +110,7 @@ public class MLPendingAckStore implements PendingAckStore {
     }
 
     @Override
-    public void replayAsync(PendingAckHandleImpl pendingAckHandle, ScheduledExecutorService transactionReplayExecutor) {
+    public void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService transactionReplayExecutor) {
         transactionReplayExecutor
                 .execute(new PendingAckReplay(new MLPendingAckReplyCallBack(pendingAckHandle)));
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 9f4d49f3f59..90bddef52d5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -34,7 +34,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -157,8 +156,7 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
                 this.pendingAckStoreFuture =
                         pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
                 this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
-                    pendingAckStore.replayAsync(this,
-                            (ScheduledExecutorService) internalPinnedExecutor);
+                    pendingAckStore.replayAsync(this, internalPinnedExecutor);
                 }).exceptionally(e -> {
                     acceptQueue.clear();
                     changeToErrorState();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
index c0ca352c153..0e2d300f7a8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
@@ -40,7 +40,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -128,7 +128,7 @@ public class PersistentSubscriptionTest {
             public CompletableFuture<PendingAckStore> newPendingAckStore(PersistentSubscription subscription) {
                 return CompletableFuture.completedFuture(new PendingAckStore() {
                     @Override
-                    public void replayAsync(PendingAckHandleImpl pendingAckHandle, ScheduledExecutorService executorService) {
+                    public void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService executorService) {
                         try {
                             Field field = PendingAckHandleState.class.getDeclaredField("state");
                             field.setAccessible(true);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
index d8c8bd657f8..29ecb39853a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
@@ -72,7 +72,7 @@ public class MultiTopicsConsumerTest extends ProducerConsumerBase {
         return new PulsarClientImpl(conf) {
             {
                 ScheduledExecutorService internalExecutorService =
-                        (ScheduledExecutorService) super.getInternalExecutorService();
+                        (ScheduledExecutorService) super.getScheduledExecutorProvider().getExecutor();
                 internalExecutorServiceDelegate = mock(ScheduledExecutorService.class,
                         // a spy isn't used since that doesn't work for private classes, instead
                         // the mock delegatesTo an existing instance. A delegate is sufficient for verifying
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index ed8fb39a3ae..33c2a3cc266 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -33,7 +33,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.locks.Lock;
@@ -73,8 +72,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     protected final MessageListener<T> listener;
     protected final ConsumerEventListener consumerEventListener;
     protected final ExecutorProvider executorProvider;
-    protected final ScheduledExecutorService externalPinnedExecutor;
-    protected final ScheduledExecutorService internalPinnedExecutor;
+    protected final ExecutorService externalPinnedExecutor;
+    protected final ExecutorService internalPinnedExecutor;
     final BlockingQueue<Message<T>> incomingMessages;
     protected ConcurrentOpenHashMap<MessageIdImpl, MessageIdImpl[]> unAckedChunkedMessageIdSequenceMap;
     protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives;
@@ -113,8 +112,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         this.unAckedChunkedMessageIdSequenceMap =
                 ConcurrentOpenHashMap.<MessageIdImpl, MessageIdImpl[]>newBuilder().build();
         this.executorProvider = executorProvider;
-        this.externalPinnedExecutor = (ScheduledExecutorService) executorProvider.getExecutor();
-        this.internalPinnedExecutor = (ScheduledExecutorService) client.getInternalExecutorService();
+        this.externalPinnedExecutor = executorProvider.getExecutor();
+        this.internalPinnedExecutor = client.getInternalExecutorService();
         this.pendingReceives = Queues.newConcurrentLinkedQueue();
         this.pendingBatchReceives = Queues.newConcurrentLinkedQueue();
         this.schema = schema;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 064d347fce2..4d59064a035 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -48,6 +48,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -1363,10 +1364,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
         // Lazy task scheduling to expire incomplete chunk message
         if (!expireChunkMessageTaskScheduled && expireTimeOfIncompleteChunkedMessageMillis > 0) {
-            internalPinnedExecutor
-                    .scheduleAtFixedRate(catchingAndLoggingThrowables(this::removeExpireIncompleteChunkedMessages),
-                            expireTimeOfIncompleteChunkedMessageMillis, expireTimeOfIncompleteChunkedMessageMillis,
-                            TimeUnit.MILLISECONDS);
+            ((ScheduledExecutorService) client.getScheduledExecutorProvider().getExecutor()).scheduleAtFixedRate(
+                    () -> internalPinnedExecutor
+                            .execute(catchingAndLoggingThrowables(this::removeExpireIncompleteChunkedMessages)),
+                    expireTimeOfIncompleteChunkedMessageMillis, expireTimeOfIncompleteChunkedMessageMillis,
+                    TimeUnit.MILLISECONDS
+            );
             expireChunkMessageTaskScheduled = true;
         }
 
@@ -2353,7 +2356,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 return;
             }
 
-            internalPinnedExecutor.schedule(() -> {
+            ((ScheduledExecutorService) client.getScheduledExecutorProvider().getExecutor()).schedule(() -> {
                 log.warn("[{}] [{}] Could not get connection while getLastMessageId -- Will try again in {} ms",
                         topic, getHandlerName(), nextDelay);
                 remainingTime.addAndGet(-nextDelay);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 3353dea8733..68c5acbc3d0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -41,6 +41,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -278,7 +279,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                 return null;
             }
             log.error("Receive operation failed on consumer {} - Retrying later", consumer, ex);
-            internalPinnedExecutor.schedule(() -> receiveMessageFromConsumer(consumer), 10, TimeUnit.SECONDS);
+            ((ScheduledExecutorService) client.getScheduledExecutorProvider())
+                    .schedule(() -> receiveMessageFromConsumer(consumer), 10, TimeUnit.SECONDS);
             return null;
         });
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index c2bf8a216e2..3c65339e91d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -77,6 +77,7 @@ import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvid
 import org.apache.pulsar.client.impl.transaction.TransactionBuilderImpl;
 import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
 import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.client.util.ScheduledExecutorProvider;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
@@ -105,6 +106,8 @@ public class PulsarClientImpl implements PulsarClient {
     private boolean needStopTimer;
     private final ExecutorProvider externalExecutorProvider;
     private final ExecutorProvider internalExecutorProvider;
+
+    private final ScheduledExecutorProvider scheduledExecutorProvider;
     private final boolean createdEventLoopGroup;
     private final boolean createdCnxPool;
 
@@ -190,6 +193,8 @@ public class PulsarClientImpl implements PulsarClient {
                     new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
             this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider :
                     new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
+            this.scheduledExecutorProvider = new ScheduledExecutorProvider(conf.getNumIoThreads(),
+                    "pulsar-client-scheduled");
             if (conf.getServiceUrl().startsWith("http")) {
                 lookup = new HttpLookupService(conf, this.eventLoopGroup);
             } else {
@@ -996,7 +1001,7 @@ public class PulsarClientImpl implements PulsarClient {
             }
             previousExceptions.add(e);
 
-            ((ScheduledExecutorService) externalExecutorProvider.getExecutor()).schedule(() -> {
+            ((ScheduledExecutorService) scheduledExecutorProvider.getExecutor()).schedule(() -> {
                 log.warn("[topic: {}] Could not get connection while getPartitionedTopicMetadata -- "
                         + "Will try again in {} ms", topicName, nextDelay);
                 remainingTime.addAndGet(-nextDelay);
@@ -1118,6 +1123,11 @@ public class PulsarClientImpl implements PulsarClient {
     public ExecutorService getInternalExecutorService() {
         return internalExecutorProvider.getExecutor();
     }
+
+    public ScheduledExecutorProvider getScheduledExecutorProvider() {
+        return scheduledExecutorProvider;
+    }
+
     //
     // Transaction related API
     //
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
index b5fb3543b82..67606af63a7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import lombok.Getter;
@@ -41,7 +40,7 @@ public class ExecutorProvider {
     private final String poolName;
     private volatile boolean isShutdown;
 
-    private static class ExtendedThreadFactory extends DefaultThreadFactory {
+    protected static class ExtendedThreadFactory extends DefaultThreadFactory {
 
         @Getter
         private Thread thread;
@@ -56,7 +55,6 @@ public class ExecutorProvider {
         }
     }
 
-
     public ExecutorProvider(int numThreads, String poolName) {
         checkArgument(numThreads > 0);
         this.numThreads = numThreads;
@@ -65,13 +63,17 @@ public class ExecutorProvider {
         for (int i = 0; i < numThreads; i++) {
             ExtendedThreadFactory threadFactory = new ExtendedThreadFactory(
                     poolName, Thread.currentThread().isDaemon());
-            ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(threadFactory);
+            ExecutorService executor = createExecutor(threadFactory);
             executors.add(Pair.of(executor, threadFactory));
         }
         isShutdown = false;
         this.poolName = poolName;
     }
 
+    protected ExecutorService createExecutor(ExtendedThreadFactory threadFactory) {
+       return Executors.newSingleThreadExecutor(threadFactory);
+    }
+
     public ExecutorService getExecutor() {
         return executors.get((currentThread.getAndIncrement() & Integer.MAX_VALUE) % numThreads).getKey();
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
new file mode 100644
index 00000000000..887ae3bb7ff
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.util;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class ScheduledExecutorProvider extends ExecutorProvider {
+
+    public ScheduledExecutorProvider(int numThreads, String poolName) {
+        super(numThreads, poolName);
+    }
+
+    @Override
+    protected ExecutorService createExecutor(ExtendedThreadFactory threadFactory) {
+        return Executors.newSingleThreadScheduledExecutor(threadFactory);
+    }
+}