You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/28 15:13:24 UTC

[pulsar] 19/29: [improve][java-client] Only trigger the batch receive timeout when having pending batch receives requests (#16160)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6ed4ed058789bf41995a9f364b48500f401bbc26
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Jun 23 09:04:25 2022 +0800

    [improve][java-client] Only trigger the batch receive timeout when having pending batch receives requests (#16160)
    
    The consumer will apply the default batch receive policy even if the user will not use the batch receive API.
    
    https://github.com/apache/pulsar/blob/6704f12104219611164aa2bb5bbdfc929613f1bf/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java#L60-L61
    
    This will consume lots of CPU if the client have many consumers (100k consumers)
    
    The Pulsar perf tool can also reproduce the problem if run the test with many consumers
    
    If there is no pending batch receive operation for a consumer, no need to trigger the
    batch timeout task periodically. We can only start the timeout check after adding batch
    receive request to pending request queue.
    
    Remove the lock in MultiTopicsConsumerImpl as #10352 does
    
    Added new test to verify the batch receive timeout task will not start if no batch
    receive request
    
    (cherry picked from commit a0ccdc96bb05d19651f3778c23b89425d516d77a)
---
 .../client/api/ConsumerBatchReceiveTest.java       | 47 ++++++++++++++++++++++
 .../pulsar/client/impl/MessageChunkingTest.java    |  1 +
 .../apache/pulsar/client/impl/ConsumerBase.java    | 20 +++++++--
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  1 +
 .../client/impl/MultiTopicsConsumerImpl.java       | 20 +++------
 .../client/impl/MultiTopicsConsumerImplTest.java   |  2 +-
 6 files changed, 71 insertions(+), 20 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
index 8f3d6423afb..8109e8ce8eb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.client.api;
 
 import lombok.Cleanup;
+import org.apache.pulsar.client.impl.ConsumerBase;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -48,6 +50,14 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase {
         super.internalCleanup();
     }
 
+    @DataProvider(name = "partitioned")
+    public Object[][] partitionedTopicProvider() {
+        return new Object[][] {
+            { true },
+            { false }
+        };
+    }
+
     @DataProvider(name = "batchReceivePolicy")
     public Object[][] batchReceivePolicyProvider() {
         return new Object[][] {
@@ -425,6 +435,43 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase {
         latch.await();
     }
 
+    @Test(dataProvider = "partitioned")
+    public void testBatchReceiveTimeoutTask(boolean partitioned) throws Exception {
+        final String topic = "persistent://my-property/my-ns/batch-receive-" + UUID.randomUUID();
+
+        if (partitioned) {
+            admin.topics().createPartitionedTopic(topic, 3);
+        }
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub")
+                .receiverQueueSize(1)
+                .batchReceivePolicy(BatchReceivePolicy.builder()
+                        .maxNumBytes(1024 * 1024)
+                        .maxNumMessages(1)
+                        .timeout(5, TimeUnit.SECONDS)
+                        .build())
+                .subscribe();
+        Assert.assertFalse(((ConsumerBase<?>)consumer).hasBatchReceiveTimeout());
+        final int messagesToSend = 500;
+        sendMessagesAsyncAndWait(producer, messagesToSend);
+        for (int i = 0; i < 100; i++) {
+            Assert.assertNotNull(consumer.receive());
+        }
+        Assert.assertFalse(((ConsumerBase<?>)consumer).hasBatchReceiveTimeout());
+        for (int i = 0; i < 400; i++) {
+            Messages<String> batchReceived = consumer.batchReceive();
+            Assert.assertEquals(batchReceived.size(), 1);
+        }
+        Awaitility.await().untilAsserted(() -> Assert.assertFalse(((ConsumerBase<?>)consumer).hasBatchReceiveTimeout()));
+        Assert.assertEquals(consumer.batchReceive().size(), 0);
+        Awaitility.await().untilAsserted(() -> Assert.assertFalse(((ConsumerBase<?>)consumer).hasBatchReceiveTimeout()));
+    }
+
 
     private void receiveAllBatchesAndVerifyBatchSizeIsEqualToMaxNumMessages(Consumer<String> consumer,
                                                                             BatchReceivePolicy batchReceivePolicy,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index 3f3557290b5..c223208a8ef 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -52,6 +52,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.impl.MessageImpl.SchemaState;
 import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 689c4eb7405..ed8fb39a3ae 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -149,7 +149,10 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
             this.batchReceivePolicy = BatchReceivePolicy.DEFAULT_POLICY;
         }
 
-        if (batchReceivePolicy.getTimeoutMs() > 0) {
+    }
+
+    protected void triggerBatchReceiveTimeoutTask() {
+        if (!hasBatchReceiveTimeout() && batchReceivePolicy.getTimeoutMs() > 0) {
             batchReceiveTimeout = client.timer().newTimeout(this::pendingBatchReceiveTask,
                     batchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS);
         }
@@ -905,7 +908,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         }
 
         long timeToWaitMs;
-
+        boolean hasPendingReceives = false;
         synchronized (this) {
             // If it's closing/closed we need to ignore this timeout and not schedule next timeout.
             if (getState() == State.Closing || getState() == State.Closed) {
@@ -942,13 +945,18 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
                 } else {
                     // The diff is greater than zero, set the timeout to the diff value
                     timeToWaitMs = diff;
+                    hasPendingReceives = true;
                     break;
                 }
 
                 opBatchReceive = pendingBatchReceives.peek();
             }
-            batchReceiveTimeout = client.timer().newTimeout(this::pendingBatchReceiveTask,
-                    timeToWaitMs, TimeUnit.MILLISECONDS);
+            if (hasPendingReceives) {
+                batchReceiveTimeout = client.timer().newTimeout(this::pendingBatchReceiveTask,
+                        timeToWaitMs, TimeUnit.MILLISECONDS);
+            } else {
+                batchReceiveTimeout = null;
+            }
         }
     }
 
@@ -1091,5 +1099,9 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         return true;
     }
 
+    public boolean hasBatchReceiveTimeout() {
+        return batchReceiveTimeout != null;
+    }
+
     private static final Logger log = LoggerFactory.getLogger(ConsumerBase.class);
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 40fc120464e..064d347fce2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -540,6 +540,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             } else {
                 OpBatchReceive<T> opBatchReceive = OpBatchReceive.of(result);
                 pendingBatchReceives.add(opBatchReceive);
+                triggerBatchReceiveTimeoutTask();
                 cancellationHandler.setCancelAction(() -> pendingBatchReceives.remove(opBatchReceive));
             }
         });
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index abfa5f72155..3353dea8733 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -44,8 +44,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -98,8 +96,6 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
     private volatile Timeout partitionsAutoUpdateTimeout = null;
     TopicsPartitionChangedListener topicsPartitionChangedListener;
     CompletableFuture<Void> partitionsAutoUpdateFuture = null;
-    private final ReadWriteLock lock = new ReentrantReadWriteLock();
-
     private final ConsumerStatsRecorder stats;
     private UnAckedMessageTracker unAckedMessageTracker;
     private final ConsumerConfigurationData<T> internalConfig;
@@ -407,8 +403,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
     protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
         CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
         CompletableFuture<Messages<T>> result = cancellationHandler.createFuture();
-        try {
-            lock.writeLock().lock();
+        internalPinnedExecutor.execute(() -> {
             if (hasEnoughMessagesForBatchReceive()) {
                 MessagesImpl<T> messages = getNewMessagesImpl();
                 Message<T> msgPeeked = incomingMessages.peek();
@@ -429,13 +424,11 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             } else {
                 OpBatchReceive<T> opBatchReceive = OpBatchReceive.of(result);
                 pendingBatchReceives.add(opBatchReceive);
+                triggerBatchReceiveTimeoutTask();
                 cancellationHandler.setCancelAction(() -> pendingBatchReceives.remove(opBatchReceive));
             }
             resumeReceivingFromPausedConsumersIfNeeded();
-        } finally {
-            lock.writeLock().unlock();
-        }
-
+        });
         return result;
     }
 
@@ -677,8 +670,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
 
     @Override
     public void redeliverUnacknowledgedMessages() {
-        lock.writeLock().lock();
-        try {
+        internalPinnedExecutor.execute(() -> {
             CONSUMER_EPOCH.incrementAndGet(this);
             consumers.values().stream().forEach(consumer -> {
                 consumer.redeliverUnacknowledgedMessages();
@@ -686,9 +678,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             });
             clearIncomingMessages();
             unAckedMessageTracker.clear();
-        } finally {
-            lock.writeLock().unlock();
-        }
+        });
         resumeReceivingFromPausedConsumersIfNeeded();
     }
 
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index 95dc07c2625..297059b5fee 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -176,7 +176,7 @@ public class MultiTopicsConsumerImplTest {
         // given
         MultiTopicsConsumerImpl<byte[]> consumer = createMultiTopicsConsumer();
         CompletableFuture<Messages<byte[]>> future = consumer.batchReceiveAsync();
-        assertTrue(consumer.hasPendingBatchReceive());
+        Awaitility.await().untilAsserted(() -> assertTrue(consumer.hasPendingBatchReceive()));
         // when
         future.cancel(true);
         // then