You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/28 15:13:31 UTC
[pulsar] 26/29: [improve][java-client] Replace ScheduledExecutor to improve performance of message consumption (#16236)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 63f5289865a04114c93ab3174d7f77c54ecbd340
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Jun 28 11:29:32 2022 +0800
[improve][java-client] Replace ScheduledExecutor to improve performance of message consumption (#16236)
### Motivation
The Scheduled Executor doesn't work very efficiently because each task will add to a DelayedQueue(A priority queue) first even if using the `.execute()` method without any schedule delay.
<img width="1845" alt="image" src="https://user-images.githubusercontent.com/12592133/175871343-ecda138f-43a2-472e-ac42-8efdefb58810.png">
<img width="1848" alt="image" src="https://user-images.githubusercontent.com/12592133/175871415-3d8d9fbd-f140-4a4b-a78d-306c1ec9673c.png">
Profile result:
[perf_consumer_0.html.txt](https://github.com/apache/pulsar/files/8989093/perf_consumer_0.html.txt)
Running a performance test for single topic max message read rate test:
```
bin/pulsar-perf consume test -q 1000000 -p 100000000
bin/pulsar-perf produce test -r 1000000 -s 1 -mk random -o 10000 -threads 2
```
Without this PR (2.10.1):
```
Profiling started
2022-06-27T13:44:01,183+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 23919664 msg --- 265702.851 msg/s --- 2.027 Mbit/s --- Latency: mean: 49430.572 ms - med: 49406 - 95pct: 52853 - 99pct: 53024 - 99.9pct: 53053 - 99.99pct: 53056 - Max: 53057
2022-06-27T13:44:11,196+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 26690802 msg --- 276759.125 msg/s --- 2.112 Mbit/s --- Latency: mean: 56106.186 ms - med: 56000 - 95pct: 59289 - 99pct: 59985 - 99.9pct: 60037 - 99.99pct: 60042 - Max: 60042
2022-06-27T13:44:21,216+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 28788693 msg --- 209467.861 msg/s --- 1.598 Mbit/s --- Latency: mean: 63523.143 ms - med: 63580 - 95pct: 67202 - 99pct: 67523 - 99.9pct: 67547 - 99.99pct: 67548 - Max: 67548
2022-06-27T13:44:31,233+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 31255365 msg --- 246190.932 msg/s --- 1.878 Mbit/s --- Latency: mean: 71152.370 ms - med: 71058 - 95pct: 74555 - 99pct: 74806 - 99.9pct: 74842 - 99.99pct: 74847 - Max: 74847
2022-06-27T13:44:41,247+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 33606630 msg --- 234769.313 msg/s --- 1.791 Mbit/s --- Latency: mean: 78636.478 ms - med: 78724 - 95pct: 81694 - 99pct: 82090 - 99.9pct: 82279 - 99.99pct: 82285 - Max: 82286
```
With this PR:
```
Profiling started
2022-06-27T13:56:20,426+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 431272207 msg --- 1079360.516 msg/s --- 8.235 Mbit/s --- Latency: mean: 272.645 ms - med: 334 - 95pct: 470 - 99pct: 510 - 99.9pct: 522 - 99.99pct: 523 - Max: 524
2022-06-27T13:56:30,438+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 441292346 msg --- 1000645.852 msg/s --- 7.634 Mbit/s --- Latency: mean: 15.512 ms - med: 14 - 95pct: 29 - 99pct: 39 - 99.9pct: 54 - 99.99pct: 55 - Max: 55
2022-06-27T13:56:40,450+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 451303308 msg --- 999973.040 msg/s --- 7.629 Mbit/s --- Latency: mean: 18.265 ms - med: 14 - 95pct: 53 - 99pct: 98 - 99.9pct: 174 - 99.99pct: 176 - Max: 177
2022-06-27T13:56:50,462+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 461308082 msg --- 999309.458 msg/s --- 7.624 Mbit/s --- Latency: mean: 14.728 ms - med: 14 - 95pct: 18 - 99pct: 41 - 99.9pct: 50 - 99.99pct: 51 - Max: 52
2022-06-27T13:57:00,475+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 471327606 msg --- 1000738.584 msg/s --- 7.635 Mbit/s --- Latency: mean: 21.291 ms - med: 16 - 95pct: 52 - 99pct: 61 - 99.9pct: 65 - 99.99pct: 66 - Max: 66
```
Profile result with this PR:
[perf_consumer_1.html.txt](https://github.com/apache/pulsar/files/8989095/perf_consumer_1.html.txt)
### Modification
- Change internal executor and external executor to normal executor service
- Added a new ScheduledExecutorProvider to handle the scheduled tasks.
(cherry picked from commit 96237a9615fefa2bed247b416bf1a12d8bc4b201)
---
.../transaction/pendingack/PendingAckStore.java | 4 +--
.../pendingack/impl/InMemoryPendingAckStore.java | 4 +--
.../pendingack/impl/MLPendingAckStore.java | 4 +--
.../pendingack/impl/PendingAckHandleImpl.java | 4 +--
.../persistent/PersistentSubscriptionTest.java | 4 +--
.../pulsar/client/api/MultiTopicsConsumerTest.java | 2 +-
.../apache/pulsar/client/impl/ConsumerBase.java | 9 +++---
.../apache/pulsar/client/impl/ConsumerImpl.java | 13 +++++---
.../client/impl/MultiTopicsConsumerImpl.java | 4 ++-
.../pulsar/client/impl/PulsarClientImpl.java | 12 +++++++-
.../pulsar/client/util/ExecutorProvider.java | 10 +++---
.../client/util/ScheduledExecutorProvider.java | 36 ++++++++++++++++++++++
12 files changed, 78 insertions(+), 28 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java
index 3da676eb827..2f85d2430db 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.broker.transaction.pendingack;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.MutablePair;
@@ -38,7 +38,7 @@ public interface PendingAckStore {
* @param pendingAckHandle the handle of pending ack
* @param executorService the replay executor service
*/
- void replayAsync(PendingAckHandleImpl pendingAckHandle, ScheduledExecutorService executorService);
+ void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService executorService);
/**
* Close the transaction pending ack store.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java
index d882c80c478..44c9fbe039b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.broker.transaction.pendingack.impl;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
@@ -33,7 +33,7 @@ import org.apache.pulsar.common.api.proto.CommandAck.AckType;
public class InMemoryPendingAckStore implements PendingAckStore {
@Override
- public void replayAsync(PendingAckHandleImpl pendingAckHandle, ScheduledExecutorService scheduledExecutorService) {
+ public void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService scheduledExecutorService) {
pendingAckHandle.changeToReadyState();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
index 4ac0cd9b82c..8b115543561 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
@@ -26,7 +26,7 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
@@ -110,7 +110,7 @@ public class MLPendingAckStore implements PendingAckStore {
}
@Override
- public void replayAsync(PendingAckHandleImpl pendingAckHandle, ScheduledExecutorService transactionReplayExecutor) {
+ public void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService transactionReplayExecutor) {
transactionReplayExecutor
.execute(new PendingAckReplay(new MLPendingAckReplyCallBack(pendingAckHandle)));
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 9f4d49f3f59..90bddef52d5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -34,7 +34,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -157,8 +156,7 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
this.pendingAckStoreFuture =
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
- pendingAckStore.replayAsync(this,
- (ScheduledExecutorService) internalPinnedExecutor);
+ pendingAckStore.replayAsync(this, internalPinnedExecutor);
}).exceptionally(e -> {
acceptQueue.clear();
changeToErrorState();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
index c0ca352c153..0e2d300f7a8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
@@ -40,7 +40,7 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -128,7 +128,7 @@ public class PersistentSubscriptionTest {
public CompletableFuture<PendingAckStore> newPendingAckStore(PersistentSubscription subscription) {
return CompletableFuture.completedFuture(new PendingAckStore() {
@Override
- public void replayAsync(PendingAckHandleImpl pendingAckHandle, ScheduledExecutorService executorService) {
+ public void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService executorService) {
try {
Field field = PendingAckHandleState.class.getDeclaredField("state");
field.setAccessible(true);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
index d8c8bd657f8..29ecb39853a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
@@ -72,7 +72,7 @@ public class MultiTopicsConsumerTest extends ProducerConsumerBase {
return new PulsarClientImpl(conf) {
{
ScheduledExecutorService internalExecutorService =
- (ScheduledExecutorService) super.getInternalExecutorService();
+ (ScheduledExecutorService) super.getScheduledExecutorProvider().getExecutor();
internalExecutorServiceDelegate = mock(ScheduledExecutorService.class,
// a spy isn't used since that doesn't work for private classes, instead
// the mock delegatesTo an existing instance. A delegate is sufficient for verifying
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index ed8fb39a3ae..33c2a3cc266 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -33,7 +33,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.locks.Lock;
@@ -73,8 +72,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
protected final MessageListener<T> listener;
protected final ConsumerEventListener consumerEventListener;
protected final ExecutorProvider executorProvider;
- protected final ScheduledExecutorService externalPinnedExecutor;
- protected final ScheduledExecutorService internalPinnedExecutor;
+ protected final ExecutorService externalPinnedExecutor;
+ protected final ExecutorService internalPinnedExecutor;
final BlockingQueue<Message<T>> incomingMessages;
protected ConcurrentOpenHashMap<MessageIdImpl, MessageIdImpl[]> unAckedChunkedMessageIdSequenceMap;
protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives;
@@ -113,8 +112,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
this.unAckedChunkedMessageIdSequenceMap =
ConcurrentOpenHashMap.<MessageIdImpl, MessageIdImpl[]>newBuilder().build();
this.executorProvider = executorProvider;
- this.externalPinnedExecutor = (ScheduledExecutorService) executorProvider.getExecutor();
- this.internalPinnedExecutor = (ScheduledExecutorService) client.getInternalExecutorService();
+ this.externalPinnedExecutor = executorProvider.getExecutor();
+ this.internalPinnedExecutor = client.getInternalExecutorService();
this.pendingReceives = Queues.newConcurrentLinkedQueue();
this.pendingBatchReceives = Queues.newConcurrentLinkedQueue();
this.schema = schema;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 064d347fce2..4d59064a035 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -48,6 +48,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -1363,10 +1364,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
// Lazy task scheduling to expire incomplete chunk message
if (!expireChunkMessageTaskScheduled && expireTimeOfIncompleteChunkedMessageMillis > 0) {
- internalPinnedExecutor
- .scheduleAtFixedRate(catchingAndLoggingThrowables(this::removeExpireIncompleteChunkedMessages),
- expireTimeOfIncompleteChunkedMessageMillis, expireTimeOfIncompleteChunkedMessageMillis,
- TimeUnit.MILLISECONDS);
+ ((ScheduledExecutorService) client.getScheduledExecutorProvider().getExecutor()).scheduleAtFixedRate(
+ () -> internalPinnedExecutor
+ .execute(catchingAndLoggingThrowables(this::removeExpireIncompleteChunkedMessages)),
+ expireTimeOfIncompleteChunkedMessageMillis, expireTimeOfIncompleteChunkedMessageMillis,
+ TimeUnit.MILLISECONDS
+ );
expireChunkMessageTaskScheduled = true;
}
@@ -2353,7 +2356,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
return;
}
- internalPinnedExecutor.schedule(() -> {
+ ((ScheduledExecutorService) client.getScheduledExecutorProvider().getExecutor()).schedule(() -> {
log.warn("[{}] [{}] Could not get connection while getLastMessageId -- Will try again in {} ms",
topic, getHandlerName(), nextDelay);
remainingTime.addAndGet(-nextDelay);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 3353dea8733..68c5acbc3d0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -41,6 +41,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -278,7 +279,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
return null;
}
log.error("Receive operation failed on consumer {} - Retrying later", consumer, ex);
- internalPinnedExecutor.schedule(() -> receiveMessageFromConsumer(consumer), 10, TimeUnit.SECONDS);
+ ((ScheduledExecutorService) client.getScheduledExecutorProvider())
+ .schedule(() -> receiveMessageFromConsumer(consumer), 10, TimeUnit.SECONDS);
return null;
});
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index c2bf8a216e2..3c65339e91d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -77,6 +77,7 @@ import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvid
import org.apache.pulsar.client.impl.transaction.TransactionBuilderImpl;
import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.client.util.ScheduledExecutorProvider;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
@@ -105,6 +106,8 @@ public class PulsarClientImpl implements PulsarClient {
private boolean needStopTimer;
private final ExecutorProvider externalExecutorProvider;
private final ExecutorProvider internalExecutorProvider;
+
+ private final ScheduledExecutorProvider scheduledExecutorProvider;
private final boolean createdEventLoopGroup;
private final boolean createdCnxPool;
@@ -190,6 +193,8 @@ public class PulsarClientImpl implements PulsarClient {
new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider :
new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
+ this.scheduledExecutorProvider = new ScheduledExecutorProvider(conf.getNumIoThreads(),
+ "pulsar-client-scheduled");
if (conf.getServiceUrl().startsWith("http")) {
lookup = new HttpLookupService(conf, this.eventLoopGroup);
} else {
@@ -996,7 +1001,7 @@ public class PulsarClientImpl implements PulsarClient {
}
previousExceptions.add(e);
- ((ScheduledExecutorService) externalExecutorProvider.getExecutor()).schedule(() -> {
+ ((ScheduledExecutorService) scheduledExecutorProvider.getExecutor()).schedule(() -> {
log.warn("[topic: {}] Could not get connection while getPartitionedTopicMetadata -- "
+ "Will try again in {} ms", topicName, nextDelay);
remainingTime.addAndGet(-nextDelay);
@@ -1118,6 +1123,11 @@ public class PulsarClientImpl implements PulsarClient {
public ExecutorService getInternalExecutorService() {
return internalExecutorProvider.getExecutor();
}
+
+ public ScheduledExecutorProvider getScheduledExecutorProvider() {
+ return scheduledExecutorProvider;
+ }
+
//
// Transaction related API
//
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
index b5fb3543b82..67606af63a7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
@@ -25,7 +25,6 @@ import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Getter;
@@ -41,7 +40,7 @@ public class ExecutorProvider {
private final String poolName;
private volatile boolean isShutdown;
- private static class ExtendedThreadFactory extends DefaultThreadFactory {
+ protected static class ExtendedThreadFactory extends DefaultThreadFactory {
@Getter
private Thread thread;
@@ -56,7 +55,6 @@ public class ExecutorProvider {
}
}
-
public ExecutorProvider(int numThreads, String poolName) {
checkArgument(numThreads > 0);
this.numThreads = numThreads;
@@ -65,13 +63,17 @@ public class ExecutorProvider {
for (int i = 0; i < numThreads; i++) {
ExtendedThreadFactory threadFactory = new ExtendedThreadFactory(
poolName, Thread.currentThread().isDaemon());
- ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(threadFactory);
+ ExecutorService executor = createExecutor(threadFactory);
executors.add(Pair.of(executor, threadFactory));
}
isShutdown = false;
this.poolName = poolName;
}
+ protected ExecutorService createExecutor(ExtendedThreadFactory threadFactory) {
+ return Executors.newSingleThreadExecutor(threadFactory);
+ }
+
public ExecutorService getExecutor() {
return executors.get((currentThread.getAndIncrement() & Integer.MAX_VALUE) % numThreads).getKey();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
new file mode 100644
index 00000000000..887ae3bb7ff
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.util;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class ScheduledExecutorProvider extends ExecutorProvider {
+
+ public ScheduledExecutorProvider(int numThreads, String poolName) {
+ super(numThreads, poolName);
+ }
+
+ @Override
+ protected ExecutorService createExecutor(ExtendedThreadFactory threadFactory) {
+ return Executors.newSingleThreadScheduledExecutor(threadFactory);
+ }
+}