You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/12 13:44:56 UTC

[pulsar] 05/17: fix consumer stuck when batchReceivePolicy maxNumMessages > maxReceiverQueueSize (#6862)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c6bbfc6d59c83ee78688b4b7dd177277aa8e4c72
Author: hangc0276 <ha...@163.com>
AuthorDate: Mon May 11 08:41:11 2020 +0800

    fix consumer stuck when batchReceivePolicy maxNumMessages > maxReceiverQueueSize (#6862)
    
    Fix #6854
    
    ### Bug description
    The consumer stuck due to `hasEnoughMessagesForBatchReceive` checking:
    ```
    protected boolean hasEnoughMessagesForBatchReceive() {
            if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumBytes() <= 0) {
                return false;
            }
            return (batchReceivePolicy.getMaxNumMessages() > 0 && incomingMessages.size() >= batchReceivePolicy.getMaxNumMessages())
                    || (batchReceivePolicy.getMaxNumBytes() > 0 && INCOMING_MESSAGES_SIZE_UPDATER.get(this) >= batchReceivePolicy.getMaxNumBytes());
        }
    ```
    
    ### Changes
    When batchReceivePolicy maxNumMessages > maxReceiverQueueSize, we force batch receivePolicy maxNumMessages to maxReceiverQueueSize to avoid consumer stuck when checking `hasEnoughMessagesForBatchReceive`
    
    * fix consumer stuck when batchReceivePolicy maxNumMessages > maxReceiverQueueSize
    
    * throw log warn and add test case
    (cherry picked from commit 561868d4c441d654ab795a9386d8e9a5e28f03dd)
---
 .../client/api/ConsumerBatchReceiveTest.java       | 124 ++++++++++++++++-----
 .../pulsar/client/api/BatchReceivePolicy.java      |   2 +-
 .../apache/pulsar/client/impl/ConsumerBase.java    |  29 ++++-
 .../client/impl/MultiTopicsConsumerImpl.java       |   2 +-
 4 files changed, 128 insertions(+), 29 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
index 19ec983..54a6e52 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
@@ -55,96 +55,166 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase {
         return new Object[][] {
 
                 // Default batch receive policy.
-                { BatchReceivePolicy.DEFAULT_POLICY, true },
+                { BatchReceivePolicy.DEFAULT_POLICY, true, 1000},
                 // Only receive timeout limitation.
                 { BatchReceivePolicy.builder()
                         .timeout(50, TimeUnit.MILLISECONDS)
-                        .build(), true
+                        .build(), true, 1000
                 },
                 // Only number of messages in a single batch receive limitation.
                 { BatchReceivePolicy.builder()
                         .maxNumMessages(10)
-                        .build(), true
+                        .build(), true, 1000
                 },
                 // Number of messages and timeout limitation
                 { BatchReceivePolicy.builder()
                         .maxNumMessages(13)
                         .timeout(50, TimeUnit.MILLISECONDS)
-                        .build(), true
+                        .build(), true, 1000
                 },
                 // Size of messages and timeout limitation
                 { BatchReceivePolicy.builder()
                         .maxNumBytes(64)
                         .timeout(50, TimeUnit.MILLISECONDS)
-                        .build(), true
+                        .build(), true, 1000
                 },
                 // Default batch receive policy.
-                { BatchReceivePolicy.DEFAULT_POLICY, false },
+                { BatchReceivePolicy.DEFAULT_POLICY, false, 1000 },
                 // Only receive timeout limitation.
                 { BatchReceivePolicy.builder()
                         .timeout(50, TimeUnit.MILLISECONDS)
-                        .build(), false
+                        .build(), false, 1000
                 },
                 // Only number of messages in a single batch receive limitation.
                 { BatchReceivePolicy.builder()
                         .maxNumMessages(10)
-                        .build(), false
+                        .build(), false, 1000
                 },
                 // Number of messages and timeout limitation
                 { BatchReceivePolicy.builder()
                         .maxNumMessages(13)
                         .timeout(50, TimeUnit.MILLISECONDS)
-                        .build(), false
+                        .build(), false, 1000
                 },
                 // Size of messages and timeout limitation
                 { BatchReceivePolicy.builder()
                         .maxNumBytes(64)
                         .timeout(50, TimeUnit.MILLISECONDS)
-                        .build(), false
+                        .build(), false, 1000
+                },
+                // Number of message limitation exceed receiverQueue size
+                {
+                    BatchReceivePolicy.builder()
+                        .maxNumMessages(70)
+                        .build(), true, 50
+                },
+                // Number of message limitation exceed receiverQueue size and timeout limitation
+                {
+                    BatchReceivePolicy.builder()
+                        .maxNumMessages(50)
+                        .timeout(10, TimeUnit.MILLISECONDS)
+                        .build(), true, 30
+                },
+                // Number of message limitation is negative and timeout limitation
+                {
+                    BatchReceivePolicy.builder()
+                        .maxNumMessages(-10)
+                        .timeout(10, TimeUnit.MILLISECONDS)
+                        .build(), true, 10
+                },
+                // Size of message limitation is negative and timeout limitation
+                {
+                    BatchReceivePolicy.builder()
+                        .maxNumBytes(-100)
+                        .timeout(50, TimeUnit.MILLISECONDS)
+                        .build(), true, 30
+                },
+                // Number of message limitation and size of message limitation are both negative and timeout limitation
+                {
+                    BatchReceivePolicy.builder()
+                        .maxNumMessages(-10)
+                        .maxNumBytes(-100)
+                        .timeout(50, TimeUnit.MILLISECONDS)
+                        .build(), true, 30
+                },
+                // Number of message limitation exceed receiverQueue size
+                {
+                    BatchReceivePolicy.builder()
+                        .maxNumMessages(70)
+                        .build(), false, 50
+                },
+                // Number of message limitation exceed receiverQueue size and timeout limitation
+                {
+                    BatchReceivePolicy.builder()
+                        .maxNumMessages(50)
+                        .timeout(50, TimeUnit.MILLISECONDS)
+                        .build(), false, 30
+                },
+                // Number of message limitation is negative and timeout limitation
+                {
+                    BatchReceivePolicy.builder()
+                        .maxNumMessages(-10)
+                        .timeout(50, TimeUnit.MILLISECONDS)
+                        .build(), false, 30
+                },
+                // Size of message limitation is negative and timeout limitation
+                {
+                    BatchReceivePolicy.builder()
+                        .maxNumBytes(-100)
+                        .timeout(50, TimeUnit.MILLISECONDS)
+                        .build(), false, 30
+                },
+                // Number of message limitation and size of message limitation are both negative and timeout limitation
+                {
+                    BatchReceivePolicy.builder()
+                        .maxNumMessages(-10)
+                        .maxNumBytes(-100)
+                        .timeout(50, TimeUnit.MILLISECONDS)
+                        .build(), false, 30
                 }
         };
     }
 
     @Test(dataProvider = "batchReceivePolicy")
-    public void testBatchReceiveNonPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception {
+    public void testBatchReceiveNonPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception {
         final String topic = "persistent://my-property/my-ns/batch-receive-non-partition-" + UUID.randomUUID();
-        testBatchReceive(topic, batchReceivePolicy, batchProduce);
+        testBatchReceive(topic, batchReceivePolicy, batchProduce, receiverQueueSize);
     }
 
     @Test(dataProvider = "batchReceivePolicy")
-    public void testBatchReceivePartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception {
+    public void testBatchReceivePartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception {
         final String topic = "persistent://my-property/my-ns/batch-receive-" + UUID.randomUUID();
         admin.topics().createPartitionedTopic(topic, 3);
-        testBatchReceive(topic, batchReceivePolicy, batchProduce);
+        testBatchReceive(topic, batchReceivePolicy, batchProduce, receiverQueueSize);
     }
 
     @Test(dataProvider = "batchReceivePolicy")
-    public void testAsyncBatchReceiveNonPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception {
+    public void testAsyncBatchReceiveNonPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception {
         final String topic = "persistent://my-property/my-ns/batch-receive-non-partition-async-" + UUID.randomUUID();
-        testBatchReceiveAsync(topic, batchReceivePolicy, batchProduce);
+        testBatchReceiveAsync(topic, batchReceivePolicy, batchProduce, receiverQueueSize);
     }
 
     @Test(dataProvider = "batchReceivePolicy")
-    public void testAsyncBatchReceivePartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception {
+    public void testAsyncBatchReceivePartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception {
         final String topic = "persistent://my-property/my-ns/batch-receive-async-" + UUID.randomUUID();
         admin.topics().createPartitionedTopic(topic, 3);
-        testBatchReceiveAsync(topic, batchReceivePolicy, batchProduce);
+        testBatchReceiveAsync(topic, batchReceivePolicy, batchProduce, receiverQueueSize);
     }
 
     @Test(dataProvider = "batchReceivePolicy")
-    public void testBatchReceiveAndRedeliveryNonPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception {
+    public void testBatchReceiveAndRedeliveryNonPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception {
         final String topic = "persistent://my-property/my-ns/batch-receive-and-redelivery-non-partition-" + UUID.randomUUID();
-        testBatchReceiveAndRedelivery(topic, batchReceivePolicy, batchProduce);
+        testBatchReceiveAndRedelivery(topic, batchReceivePolicy, batchProduce, receiverQueueSize);
     }
 
     @Test(dataProvider = "batchReceivePolicy")
-    public void testBatchReceiveAndRedeliveryPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception {
+    public void testBatchReceiveAndRedeliveryPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception {
         final String topic = "persistent://my-property/my-ns/batch-receive-and-redelivery-" + UUID.randomUUID();
         admin.topics().createPartitionedTopic(topic, 3);
-        testBatchReceiveAndRedelivery(topic, batchReceivePolicy, batchProduce);
+        testBatchReceiveAndRedelivery(topic, batchReceivePolicy, batchProduce, receiverQueueSize);
     }
 
-    private void testBatchReceive(String topic, BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception {
+    private void testBatchReceive(String topic, BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception {
         ProducerBuilder<String> producerBuilder = pulsarClient.newProducer(Schema.STRING).topic(topic);
         if (!batchProduce) {
             producerBuilder.enableBatching(false);
@@ -155,14 +225,14 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase {
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                 .topic(topic)
                 .subscriptionName("s1")
+                .receiverQueueSize(receiverQueueSize)
                 .batchReceivePolicy(batchReceivePolicy)
                 .subscribe();
         sendMessagesAsyncAndWait(producer, 100);
         batchReceiveAndCheck(consumer, 100);
     }
 
-    private void testBatchReceiveAsync(String topic, BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception {
-
+    private void testBatchReceiveAsync(String topic, BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception {
         if (batchReceivePolicy.getTimeoutMs() <= 0) {
             return;
         }
@@ -178,6 +248,7 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase {
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                 .topic(topic)
                 .subscriptionName("s1")
+                .receiverQueueSize(receiverQueueSize)
                 .batchReceivePolicy(batchReceivePolicy)
                 .subscribe();
 
@@ -187,7 +258,7 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase {
         latch.await();
     }
 
-    private void testBatchReceiveAndRedelivery(String topic, BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception {
+    private void testBatchReceiveAndRedelivery(String topic, BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception {
         ProducerBuilder<String> producerBuilder = pulsarClient.newProducer(Schema.STRING).topic(topic);
         if (!batchProduce) {
             producerBuilder.enableBatching(false);
@@ -198,6 +269,7 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase {
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                 .topic(topic)
                 .subscriptionName("s1")
+                .receiverQueueSize(receiverQueueSize)
                 .batchReceivePolicy(batchReceivePolicy)
                 .ackTimeout(1, TimeUnit.SECONDS)
                 .subscribe();
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java
index f331b1b..3bce4c8 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java
@@ -97,7 +97,7 @@ public class BatchReceivePolicy implements Serializable {
         return maxNumMessages;
     }
 
-    public long getMaxNumBytes() {
+    public int getMaxNumBytes() {
         return maxNumBytes;
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 59d4041..e2b3329 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -50,6 +50,8 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T> {
 
@@ -95,10 +97,33 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         this.schema = schema;
         this.interceptors = interceptors;
         if (conf.getBatchReceivePolicy() != null) {
-            this.batchReceivePolicy = conf.getBatchReceivePolicy();
+            BatchReceivePolicy userBatchReceivePolicy = conf.getBatchReceivePolicy();
+            if (userBatchReceivePolicy.getMaxNumMessages() > this.maxReceiverQueueSize) {
+                this.batchReceivePolicy = BatchReceivePolicy.builder()
+                        .maxNumMessages(this.maxReceiverQueueSize)
+                        .maxNumBytes(userBatchReceivePolicy.getMaxNumBytes())
+                        .timeout((int) userBatchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS)
+                        .build();
+                log.warn("BatchReceivePolicy maxNumMessages: {} is greater than maxReceiverQueueSize: {}, " +
+                        "reset to maxReceiverQueueSize. batchReceivePolicy: {}",
+                        userBatchReceivePolicy.getMaxNumMessages(), this.maxReceiverQueueSize,
+                        this.batchReceivePolicy.toString());
+            } else if (userBatchReceivePolicy.getMaxNumMessages() <= 0 && userBatchReceivePolicy.getMaxNumBytes() <= 0) {
+                this.batchReceivePolicy = BatchReceivePolicy.builder()
+                        .maxNumMessages(BatchReceivePolicy.DEFAULT_POLICY.getMaxNumMessages())
+                        .maxNumBytes(BatchReceivePolicy.DEFAULT_POLICY.getMaxNumBytes())
+                        .timeout((int) userBatchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS)
+                        .build();
+                log.warn("BatchReceivePolicy maxNumMessages: {} or maxNumBytes: {} is less than 0. " +
+                        "Reset to DEFAULT_POLICY. batchReceivePolicy: {}", userBatchReceivePolicy.getMaxNumMessages(),
+                        userBatchReceivePolicy.getMaxNumBytes(), this.batchReceivePolicy.toString());
+            } else {
+                this.batchReceivePolicy = conf.getBatchReceivePolicy();
+            }
         } else {
             this.batchReceivePolicy = BatchReceivePolicy.DEFAULT_POLICY;
         }
+
         if (batchReceivePolicy.getTimeoutMs() > 0) {
             batchReceiveTimeout = client.timer().newTimeout(this::pendingBatchReceiveTask, batchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS);
         }
@@ -594,4 +619,6 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     }
 
     protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);
+
+    private static final Logger log = LoggerFactory.getLogger(ConsumerBase.class);
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 970e134..4972b99 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -626,6 +626,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
     @Override
     protected void completeOpBatchReceive(OpBatchReceive<T> op) {
         notifyPendingBatchReceivedCallBack(op);
+        resumeReceivingFromPausedConsumersIfNeeded();
     }
 
     @Override
@@ -642,7 +643,6 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         try {
             seekAsync(timestamp).get();
         } catch (Exception e) {
-            throw PulsarClientException.unwrap(e);
         }
     }