You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/10 06:52:52 UTC

[pulsar] 13/19: [improve][java-client] Improve performance of multi-topic consumer with more than one IO thread (#16336)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1649ef47559da2ec8e67c07917efd4f1f99a9743
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Jul 5 14:31:30 2022 +0800

    [improve][java-client] Improve performance of multi-topic consumer with more than one IO thread (#16336)
    
    (cherry picked from commit bdda1ebd6843be6ffd9dff67adcd85ee367f4e93)
---
 .../apache/pulsar/client/impl/MessagesImpl.java    |  6 ++-
 .../client/impl/MultiTopicsConsumerImpl.java       | 60 +++++++++++++---------
 2 files changed, 42 insertions(+), 24 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
index cfd77587344..e0a54c50fed 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
@@ -29,7 +29,7 @@ import org.apache.pulsar.client.api.Messages;
 @NotThreadSafe
 public class MessagesImpl<T> implements Messages<T> {
 
-    private List<Message<T>> messageList;
+    private final List<Message<T>> messageList;
 
     private final int maxNumberOfMessages;
     private final long maxSizeOfMessages;
@@ -80,6 +80,10 @@ public class MessagesImpl<T> implements Messages<T> {
         this.messageList.clear();
     }
 
+    List<Message<T>> getMessageList() {
+        return messageList;
+    }
+
     @Override
     public Iterator<Message<T>> iterator() {
         return  messageList.iterator();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 68c5acbc3d0..1108fd66520 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -49,6 +49,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.BatchReceivePolicy;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerStats;
 import org.apache.pulsar.client.api.Message;
@@ -242,19 +243,25 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         if (getState() == State.Ready) {
             newConsumers.forEach(consumer -> {
                 consumer.increaseAvailablePermits(consumer.getConnectionHandler().cnx(), conf.getReceiverQueueSize());
-                internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer));
+                internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer, true));
             });
         }
     }
 
-    private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
-        consumer.receiveAsync().thenAcceptAsync(message -> {
+    private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean batchReceive) {
+        CompletableFuture<List<Message<T>>> messagesFuture;
+        if (batchReceive) {
+            messagesFuture = consumer.batchReceiveAsync().thenApply(msgs -> ((MessagesImpl<T>) msgs).getMessageList());
+        } else {
+            messagesFuture = consumer.receiveAsync().thenApply(Collections::singletonList);
+        }
+        messagesFuture.thenAcceptAsync(messages -> {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] [{}] Receive message from sub consumer:{}",
                     topic, subscription, consumer.getTopic());
             }
             // Process the message, add to the queue and trigger listener or async callback
-            messageReceived(consumer, message);
+            messages.forEach(msg -> messageReceived(consumer, msg));
 
             int size = incomingMessages.size();
             if (size >= maxReceiverQueueSize
@@ -270,7 +277,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             } else {
                 // Call receiveAsync() if the incoming queue is not full. Because this block is run with
                 // thenAcceptAsync, there is no chance for recursion that would lead to stack overflow.
-                receiveMessageFromConsumer(consumer);
+                receiveMessageFromConsumer(consumer, messages.size() > 0);
             }
         }, internalPinnedExecutor).exceptionally(ex -> {
             if (ex instanceof PulsarClientException.AlreadyClosedException
@@ -279,8 +286,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                 return null;
             }
             log.error("Receive operation failed on consumer {} - Retrying later", consumer, ex);
-            ((ScheduledExecutorService) client.getScheduledExecutorProvider())
-                    .schedule(() -> receiveMessageFromConsumer(consumer), 10, TimeUnit.SECONDS);
+            ((ScheduledExecutorService) client.getScheduledExecutorProvider().getExecutor())
+                    .schedule(() -> receiveMessageFromConsumer(consumer, true), 10, TimeUnit.SECONDS);
             return null;
         });
     }
@@ -323,7 +330,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                 }
 
                 internalPinnedExecutor.execute(() -> {
-                    receiveMessageFromConsumer(consumer);
+                    receiveMessageFromConsumer(consumer, true);
                 });
             }
         }
@@ -1021,11 +1028,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                     partitionIndex -> {
                         String partitionName = TopicName.get(topicName).getPartition(partitionIndex).toString();
                         CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
-                        ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, partitionName,
-                                    configurationData, client.externalExecutorProvider(),
-                                    partitionIndex, true, listener != null, subFuture,
-                                    startMessageId, schema, interceptors,
-                                    createIfDoesNotExist, startMessageRollbackDurationInSec);
+                        ConsumerImpl<T> newConsumer = createInternalConsumer(configurationData, partitionName,
+                                partitionIndex, subFuture, createIfDoesNotExist, schema);
                         synchronized (pauseMutex) {
                             if (paused) {
                                 newConsumer.pause();
@@ -1048,11 +1052,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                     subscribeResult.completeExceptionally(new PulsarClientException(errorMessage));
                     return existingValue;
                 } else {
-                    ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, topicName, internalConfig,
-                            client.externalExecutorProvider(), -1,
-                            true, listener != null, subFuture, startMessageId, schema, interceptors,
-                            createIfDoesNotExist, startMessageRollbackDurationInSec);
-
+                    ConsumerImpl<T> newConsumer = createInternalConsumer(internalConfig, topicName,
+                            -1, subFuture, createIfDoesNotExist, schema);
                     synchronized (pauseMutex) {
                         if (paused) {
                             newConsumer.pause();
@@ -1092,6 +1093,22 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             });
     }
 
+    private ConsumerImpl<T> createInternalConsumer(ConsumerConfigurationData<T> configurationData, String partitionName,
+                                                   int partitionIndex, CompletableFuture<Consumer<T>> subFuture,
+                                                   boolean createIfDoesNotExist, Schema<T> schema) {
+        BatchReceivePolicy internalBatchReceivePolicy = BatchReceivePolicy.builder()
+                .maxNumMessages(Math.max(configurationData.getReceiverQueueSize() / 2, 1))
+                .maxNumBytes(-1)
+                .timeout(1, TimeUnit.MILLISECONDS)
+                .build();
+        configurationData.setBatchReceivePolicy(internalBatchReceivePolicy);
+        return ConsumerImpl.newConsumerImpl(client, partitionName,
+                configurationData, client.externalExecutorProvider(),
+                partitionIndex, true, listener != null, subFuture,
+                startMessageId, schema, interceptors,
+                createIfDoesNotExist, startMessageRollbackDurationInSec);
+    }
+
     // handling failure during subscribe new topic, unsubscribe success created partitions
     private void handleSubscribeOneTopicError(String topicName,
                                               Throwable error,
@@ -1348,11 +1365,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                         int partitionIndex = TopicName.getPartitionIndex(partitionName);
                         CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
                         ConsumerConfigurationData<T> configurationData = getInternalConsumerConfig();
-                        ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(
-                                client, partitionName, configurationData,
-                                client.externalExecutorProvider(),
-                                partitionIndex, true, listener != null, subFuture, startMessageId, schema, interceptors,
-                                true /* createTopicIfDoesNotExist */, startMessageRollbackDurationInSec);
+                        ConsumerImpl<T> newConsumer = createInternalConsumer(configurationData, partitionName,
+                                partitionIndex, subFuture, true, schema);
                         synchronized (pauseMutex) {
                             if (paused) {
                                 newConsumer.pause();