You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/12 13:44:56 UTC
[pulsar] 05/17: fix consumer stuck when batchReceivePolicy
maxNumMessages > maxReceiverQueueSize (#6862)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c6bbfc6d59c83ee78688b4b7dd177277aa8e4c72
Author: hangc0276 <ha...@163.com>
AuthorDate: Mon May 11 08:41:11 2020 +0800
fix consumer stuck when batchReceivePolicy maxNumMessages > maxReceiverQueueSize (#6862)
Fix #6854
### Bug description
The consumer stuck due to `hasEnoughMessagesForBatchReceive` checking:
```
protected boolean hasEnoughMessagesForBatchReceive() {
if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumBytes() <= 0) {
return false;
}
return (batchReceivePolicy.getMaxNumMessages() > 0 && incomingMessages.size() >= batchReceivePolicy.getMaxNumMessages())
|| (batchReceivePolicy.getMaxNumBytes() > 0 && INCOMING_MESSAGES_SIZE_UPDATER.get(this) >= batchReceivePolicy.getMaxNumBytes());
}
```
### Changes
When batchReceivePolicy maxNumMessages > maxReceiverQueueSize, we force batch receivePolicy maxNumMessages to maxReceiverQueueSize to avoid consumer stuck when checking `hasEnoughMessagesForBatchReceive`
* fix consumer stuck when batchReceivePolicy maxNumMessages > maxReceiverQueueSize
* throw log warn and add test case
(cherry picked from commit 561868d4c441d654ab795a9386d8e9a5e28f03dd)
---
.../client/api/ConsumerBatchReceiveTest.java | 124 ++++++++++++++++-----
.../pulsar/client/api/BatchReceivePolicy.java | 2 +-
.../apache/pulsar/client/impl/ConsumerBase.java | 29 ++++-
.../client/impl/MultiTopicsConsumerImpl.java | 2 +-
4 files changed, 128 insertions(+), 29 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
index 19ec983..54a6e52 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
@@ -55,96 +55,166 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase {
return new Object[][] {
// Default batch receive policy.
- { BatchReceivePolicy.DEFAULT_POLICY, true },
+ { BatchReceivePolicy.DEFAULT_POLICY, true, 1000},
// Only receive timeout limitation.
{ BatchReceivePolicy.builder()
.timeout(50, TimeUnit.MILLISECONDS)
- .build(), true
+ .build(), true, 1000
},
// Only number of messages in a single batch receive limitation.
{ BatchReceivePolicy.builder()
.maxNumMessages(10)
- .build(), true
+ .build(), true, 1000
},
// Number of messages and timeout limitation
{ BatchReceivePolicy.builder()
.maxNumMessages(13)
.timeout(50, TimeUnit.MILLISECONDS)
- .build(), true
+ .build(), true, 1000
},
// Size of messages and timeout limitation
{ BatchReceivePolicy.builder()
.maxNumBytes(64)
.timeout(50, TimeUnit.MILLISECONDS)
- .build(), true
+ .build(), true, 1000
},
// Default batch receive policy.
- { BatchReceivePolicy.DEFAULT_POLICY, false },
+ { BatchReceivePolicy.DEFAULT_POLICY, false, 1000 },
// Only receive timeout limitation.
{ BatchReceivePolicy.builder()
.timeout(50, TimeUnit.MILLISECONDS)
- .build(), false
+ .build(), false, 1000
},
// Only number of messages in a single batch receive limitation.
{ BatchReceivePolicy.builder()
.maxNumMessages(10)
- .build(), false
+ .build(), false, 1000
},
// Number of messages and timeout limitation
{ BatchReceivePolicy.builder()
.maxNumMessages(13)
.timeout(50, TimeUnit.MILLISECONDS)
- .build(), false
+ .build(), false, 1000
},
// Size of messages and timeout limitation
{ BatchReceivePolicy.builder()
.maxNumBytes(64)
.timeout(50, TimeUnit.MILLISECONDS)
- .build(), false
+ .build(), false, 1000
+ },
+ // Number of message limitation exceed receiverQueue size
+ {
+ BatchReceivePolicy.builder()
+ .maxNumMessages(70)
+ .build(), true, 50
+ },
+ // Number of message limitation exceed receiverQueue size and timeout limitation
+ {
+ BatchReceivePolicy.builder()
+ .maxNumMessages(50)
+ .timeout(10, TimeUnit.MILLISECONDS)
+ .build(), true, 30
+ },
+ // Number of message limitation is negative and timeout limitation
+ {
+ BatchReceivePolicy.builder()
+ .maxNumMessages(-10)
+ .timeout(10, TimeUnit.MILLISECONDS)
+ .build(), true, 10
+ },
+ // Size of message limitation is negative and timeout limitation
+ {
+ BatchReceivePolicy.builder()
+ .maxNumBytes(-100)
+ .timeout(50, TimeUnit.MILLISECONDS)
+ .build(), true, 30
+ },
+ // Number of message limitation and size of message limitation are both negative and timeout limitation
+ {
+ BatchReceivePolicy.builder()
+ .maxNumMessages(-10)
+ .maxNumBytes(-100)
+ .timeout(50, TimeUnit.MILLISECONDS)
+ .build(), true, 30
+ },
+ // Number of message limitation exceed receiverQueue size
+ {
+ BatchReceivePolicy.builder()
+ .maxNumMessages(70)
+ .build(), false, 50
+ },
+ // Number of message limitation exceed receiverQueue size and timeout limitation
+ {
+ BatchReceivePolicy.builder()
+ .maxNumMessages(50)
+ .timeout(50, TimeUnit.MILLISECONDS)
+ .build(), false, 30
+ },
+ // Number of message limitation is negative and timeout limitation
+ {
+ BatchReceivePolicy.builder()
+ .maxNumMessages(-10)
+ .timeout(50, TimeUnit.MILLISECONDS)
+ .build(), false, 30
+ },
+ // Size of message limitation is negative and timeout limitation
+ {
+ BatchReceivePolicy.builder()
+ .maxNumBytes(-100)
+ .timeout(50, TimeUnit.MILLISECONDS)
+ .build(), false, 30
+ },
+ // Number of message limitation and size of message limitation are both negative and timeout limitation
+ {
+ BatchReceivePolicy.builder()
+ .maxNumMessages(-10)
+ .maxNumBytes(-100)
+ .timeout(50, TimeUnit.MILLISECONDS)
+ .build(), false, 30
}
};
}
@Test(dataProvider = "batchReceivePolicy")
- public void testBatchReceiveNonPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception {
+ public void testBatchReceiveNonPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception {
final String topic = "persistent://my-property/my-ns/batch-receive-non-partition-" + UUID.randomUUID();
- testBatchReceive(topic, batchReceivePolicy, batchProduce);
+ testBatchReceive(topic, batchReceivePolicy, batchProduce, receiverQueueSize);
}
@Test(dataProvider = "batchReceivePolicy")
- public void testBatchReceivePartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception {
+ public void testBatchReceivePartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception {
final String topic = "persistent://my-property/my-ns/batch-receive-" + UUID.randomUUID();
admin.topics().createPartitionedTopic(topic, 3);
- testBatchReceive(topic, batchReceivePolicy, batchProduce);
+ testBatchReceive(topic, batchReceivePolicy, batchProduce, receiverQueueSize);
}
@Test(dataProvider = "batchReceivePolicy")
- public void testAsyncBatchReceiveNonPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception {
+ public void testAsyncBatchReceiveNonPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception {
final String topic = "persistent://my-property/my-ns/batch-receive-non-partition-async-" + UUID.randomUUID();
- testBatchReceiveAsync(topic, batchReceivePolicy, batchProduce);
+ testBatchReceiveAsync(topic, batchReceivePolicy, batchProduce, receiverQueueSize);
}
@Test(dataProvider = "batchReceivePolicy")
- public void testAsyncBatchReceivePartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception {
+ public void testAsyncBatchReceivePartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception {
final String topic = "persistent://my-property/my-ns/batch-receive-async-" + UUID.randomUUID();
admin.topics().createPartitionedTopic(topic, 3);
- testBatchReceiveAsync(topic, batchReceivePolicy, batchProduce);
+ testBatchReceiveAsync(topic, batchReceivePolicy, batchProduce, receiverQueueSize);
}
@Test(dataProvider = "batchReceivePolicy")
- public void testBatchReceiveAndRedeliveryNonPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception {
+ public void testBatchReceiveAndRedeliveryNonPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception {
final String topic = "persistent://my-property/my-ns/batch-receive-and-redelivery-non-partition-" + UUID.randomUUID();
- testBatchReceiveAndRedelivery(topic, batchReceivePolicy, batchProduce);
+ testBatchReceiveAndRedelivery(topic, batchReceivePolicy, batchProduce, receiverQueueSize);
}
@Test(dataProvider = "batchReceivePolicy")
- public void testBatchReceiveAndRedeliveryPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception {
+ public void testBatchReceiveAndRedeliveryPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception {
final String topic = "persistent://my-property/my-ns/batch-receive-and-redelivery-" + UUID.randomUUID();
admin.topics().createPartitionedTopic(topic, 3);
- testBatchReceiveAndRedelivery(topic, batchReceivePolicy, batchProduce);
+ testBatchReceiveAndRedelivery(topic, batchReceivePolicy, batchProduce, receiverQueueSize);
}
- private void testBatchReceive(String topic, BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception {
+ private void testBatchReceive(String topic, BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception {
ProducerBuilder<String> producerBuilder = pulsarClient.newProducer(Schema.STRING).topic(topic);
if (!batchProduce) {
producerBuilder.enableBatching(false);
@@ -155,14 +225,14 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase {
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("s1")
+ .receiverQueueSize(receiverQueueSize)
.batchReceivePolicy(batchReceivePolicy)
.subscribe();
sendMessagesAsyncAndWait(producer, 100);
batchReceiveAndCheck(consumer, 100);
}
- private void testBatchReceiveAsync(String topic, BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception {
-
+ private void testBatchReceiveAsync(String topic, BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception {
if (batchReceivePolicy.getTimeoutMs() <= 0) {
return;
}
@@ -178,6 +248,7 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase {
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("s1")
+ .receiverQueueSize(receiverQueueSize)
.batchReceivePolicy(batchReceivePolicy)
.subscribe();
@@ -187,7 +258,7 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase {
latch.await();
}
- private void testBatchReceiveAndRedelivery(String topic, BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception {
+ private void testBatchReceiveAndRedelivery(String topic, BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception {
ProducerBuilder<String> producerBuilder = pulsarClient.newProducer(Schema.STRING).topic(topic);
if (!batchProduce) {
producerBuilder.enableBatching(false);
@@ -198,6 +269,7 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase {
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("s1")
+ .receiverQueueSize(receiverQueueSize)
.batchReceivePolicy(batchReceivePolicy)
.ackTimeout(1, TimeUnit.SECONDS)
.subscribe();
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java
index f331b1b..3bce4c8 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java
@@ -97,7 +97,7 @@ public class BatchReceivePolicy implements Serializable {
return maxNumMessages;
}
- public long getMaxNumBytes() {
+ public int getMaxNumBytes() {
return maxNumBytes;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 59d4041..e2b3329 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -50,6 +50,8 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T> {
@@ -95,10 +97,33 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
this.schema = schema;
this.interceptors = interceptors;
if (conf.getBatchReceivePolicy() != null) {
- this.batchReceivePolicy = conf.getBatchReceivePolicy();
+ BatchReceivePolicy userBatchReceivePolicy = conf.getBatchReceivePolicy();
+ if (userBatchReceivePolicy.getMaxNumMessages() > this.maxReceiverQueueSize) {
+ this.batchReceivePolicy = BatchReceivePolicy.builder()
+ .maxNumMessages(this.maxReceiverQueueSize)
+ .maxNumBytes(userBatchReceivePolicy.getMaxNumBytes())
+ .timeout((int) userBatchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS)
+ .build();
+ log.warn("BatchReceivePolicy maxNumMessages: {} is greater than maxReceiverQueueSize: {}, " +
+ "reset to maxReceiverQueueSize. batchReceivePolicy: {}",
+ userBatchReceivePolicy.getMaxNumMessages(), this.maxReceiverQueueSize,
+ this.batchReceivePolicy.toString());
+ } else if (userBatchReceivePolicy.getMaxNumMessages() <= 0 && userBatchReceivePolicy.getMaxNumBytes() <= 0) {
+ this.batchReceivePolicy = BatchReceivePolicy.builder()
+ .maxNumMessages(BatchReceivePolicy.DEFAULT_POLICY.getMaxNumMessages())
+ .maxNumBytes(BatchReceivePolicy.DEFAULT_POLICY.getMaxNumBytes())
+ .timeout((int) userBatchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS)
+ .build();
+ log.warn("BatchReceivePolicy maxNumMessages: {} or maxNumBytes: {} is less than 0. " +
+ "Reset to DEFAULT_POLICY. batchReceivePolicy: {}", userBatchReceivePolicy.getMaxNumMessages(),
+ userBatchReceivePolicy.getMaxNumBytes(), this.batchReceivePolicy.toString());
+ } else {
+ this.batchReceivePolicy = conf.getBatchReceivePolicy();
+ }
} else {
this.batchReceivePolicy = BatchReceivePolicy.DEFAULT_POLICY;
}
+
if (batchReceivePolicy.getTimeoutMs() > 0) {
batchReceiveTimeout = client.timer().newTimeout(this::pendingBatchReceiveTask, batchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS);
}
@@ -594,4 +619,6 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
}
protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);
+
+ private static final Logger log = LoggerFactory.getLogger(ConsumerBase.class);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 970e134..4972b99 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -626,6 +626,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
@Override
protected void completeOpBatchReceive(OpBatchReceive<T> op) {
notifyPendingBatchReceivedCallBack(op);
+ resumeReceivingFromPausedConsumersIfNeeded();
}
@Override
@@ -642,7 +643,6 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
try {
seekAsync(timestamp).get();
} catch (Exception e) {
- throw PulsarClientException.unwrap(e);
}
}