You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/10 06:52:52 UTC
[pulsar] 13/19: [improve][java-client] Improve performance of multi-topic consumer with more than one IO thread (#16336)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1649ef47559da2ec8e67c07917efd4f1f99a9743
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Jul 5 14:31:30 2022 +0800
[improve][java-client] Improve performance of multi-topic consumer with more than one IO thread (#16336)
(cherry picked from commit bdda1ebd6843be6ffd9dff67adcd85ee367f4e93)
---
.../apache/pulsar/client/impl/MessagesImpl.java | 6 ++-
.../client/impl/MultiTopicsConsumerImpl.java | 60 +++++++++++++---------
2 files changed, 42 insertions(+), 24 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
index cfd77587344..e0a54c50fed 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
@@ -29,7 +29,7 @@ import org.apache.pulsar.client.api.Messages;
@NotThreadSafe
public class MessagesImpl<T> implements Messages<T> {
- private List<Message<T>> messageList;
+ private final List<Message<T>> messageList;
private final int maxNumberOfMessages;
private final long maxSizeOfMessages;
@@ -80,6 +80,10 @@ public class MessagesImpl<T> implements Messages<T> {
this.messageList.clear();
}
+ List<Message<T>> getMessageList() {
+ return messageList;
+ }
+
@Override
public Iterator<Message<T>> iterator() {
return messageList.iterator();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 68c5acbc3d0..1108fd66520 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -49,6 +49,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerStats;
import org.apache.pulsar.client.api.Message;
@@ -242,19 +243,25 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
if (getState() == State.Ready) {
newConsumers.forEach(consumer -> {
consumer.increaseAvailablePermits(consumer.getConnectionHandler().cnx(), conf.getReceiverQueueSize());
- internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer));
+ internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer, true));
});
}
}
- private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
- consumer.receiveAsync().thenAcceptAsync(message -> {
+ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean batchReceive) {
+ CompletableFuture<List<Message<T>>> messagesFuture;
+ if (batchReceive) {
+ messagesFuture = consumer.batchReceiveAsync().thenApply(msgs -> ((MessagesImpl<T>) msgs).getMessageList());
+ } else {
+ messagesFuture = consumer.receiveAsync().thenApply(Collections::singletonList);
+ }
+ messagesFuture.thenAcceptAsync(messages -> {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Receive message from sub consumer:{}",
topic, subscription, consumer.getTopic());
}
// Process the message, add to the queue and trigger listener or async callback
- messageReceived(consumer, message);
+ messages.forEach(msg -> messageReceived(consumer, msg));
int size = incomingMessages.size();
if (size >= maxReceiverQueueSize
@@ -270,7 +277,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
} else {
// Call receiveAsync() if the incoming queue is not full. Because this block is run with
// thenAcceptAsync, there is no chance for recursion that would lead to stack overflow.
- receiveMessageFromConsumer(consumer);
+ receiveMessageFromConsumer(consumer, messages.size() > 0);
}
}, internalPinnedExecutor).exceptionally(ex -> {
if (ex instanceof PulsarClientException.AlreadyClosedException
@@ -279,8 +286,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
return null;
}
log.error("Receive operation failed on consumer {} - Retrying later", consumer, ex);
- ((ScheduledExecutorService) client.getScheduledExecutorProvider())
- .schedule(() -> receiveMessageFromConsumer(consumer), 10, TimeUnit.SECONDS);
+ ((ScheduledExecutorService) client.getScheduledExecutorProvider().getExecutor())
+ .schedule(() -> receiveMessageFromConsumer(consumer, true), 10, TimeUnit.SECONDS);
return null;
});
}
@@ -323,7 +330,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
}
internalPinnedExecutor.execute(() -> {
- receiveMessageFromConsumer(consumer);
+ receiveMessageFromConsumer(consumer, true);
});
}
}
@@ -1021,11 +1028,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
partitionIndex -> {
String partitionName = TopicName.get(topicName).getPartition(partitionIndex).toString();
CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
- ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, partitionName,
- configurationData, client.externalExecutorProvider(),
- partitionIndex, true, listener != null, subFuture,
- startMessageId, schema, interceptors,
- createIfDoesNotExist, startMessageRollbackDurationInSec);
+ ConsumerImpl<T> newConsumer = createInternalConsumer(configurationData, partitionName,
+ partitionIndex, subFuture, createIfDoesNotExist, schema);
synchronized (pauseMutex) {
if (paused) {
newConsumer.pause();
@@ -1048,11 +1052,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
subscribeResult.completeExceptionally(new PulsarClientException(errorMessage));
return existingValue;
} else {
- ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, topicName, internalConfig,
- client.externalExecutorProvider(), -1,
- true, listener != null, subFuture, startMessageId, schema, interceptors,
- createIfDoesNotExist, startMessageRollbackDurationInSec);
-
+ ConsumerImpl<T> newConsumer = createInternalConsumer(internalConfig, topicName,
+ -1, subFuture, createIfDoesNotExist, schema);
synchronized (pauseMutex) {
if (paused) {
newConsumer.pause();
@@ -1092,6 +1093,22 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
});
}
+ private ConsumerImpl<T> createInternalConsumer(ConsumerConfigurationData<T> configurationData, String partitionName,
+ int partitionIndex, CompletableFuture<Consumer<T>> subFuture,
+ boolean createIfDoesNotExist, Schema<T> schema) {
+ BatchReceivePolicy internalBatchReceivePolicy = BatchReceivePolicy.builder()
+ .maxNumMessages(Math.max(configurationData.getReceiverQueueSize() / 2, 1))
+ .maxNumBytes(-1)
+ .timeout(1, TimeUnit.MILLISECONDS)
+ .build();
+ configurationData.setBatchReceivePolicy(internalBatchReceivePolicy);
+ return ConsumerImpl.newConsumerImpl(client, partitionName,
+ configurationData, client.externalExecutorProvider(),
+ partitionIndex, true, listener != null, subFuture,
+ startMessageId, schema, interceptors,
+ createIfDoesNotExist, startMessageRollbackDurationInSec);
+ }
+
// handling failure during subscribe new topic, unsubscribe success created partitions
private void handleSubscribeOneTopicError(String topicName,
Throwable error,
@@ -1348,11 +1365,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
int partitionIndex = TopicName.getPartitionIndex(partitionName);
CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
ConsumerConfigurationData<T> configurationData = getInternalConsumerConfig();
- ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(
- client, partitionName, configurationData,
- client.externalExecutorProvider(),
- partitionIndex, true, listener != null, subFuture, startMessageId, schema, interceptors,
- true /* createTopicIfDoesNotExist */, startMessageRollbackDurationInSec);
+ ConsumerImpl<T> newConsumer = createInternalConsumer(configurationData, partitionName,
+ partitionIndex, subFuture, true, schema);
synchronized (pauseMutex) {
if (paused) {
newConsumer.pause();