You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/08/06 08:07:00 UTC
[pulsar] branch master updated: Fix Consumer listener does not
respect receiver queue size (#11455)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 09944d9 Fix Consumer listener does not respect receiver queue size (#11455)
09944d9 is described below
commit 09944d9891e0a433da9c7a1cf7b80acf5b3c1116
Author: GuoJiwei <te...@apache.org>
AuthorDate: Fri Aug 6 16:06:18 2021 +0800
Fix Consumer listener does not respect receiver queue size (#11455)
Fixes #11008
---
.../client/api/KeySharedSubscriptionTest.java | 6 +-
.../client/api/SimpleProducerConsumerTest.java | 93 ++++++++++++++++++----
.../apache/pulsar/client/impl/ConsumerBase.java | 31 +++++---
3 files changed, 101 insertions(+), 29 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 4142edb..967094a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -525,9 +525,9 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
// Since only 1 out of 10 consumers is stuck, we should be able to receive ~90% messages,
// plus or minus for some skew in the key distribution.
- Thread.sleep(5000);
-
- assertEquals((double) receivedMessages.get(), N * 0.9, N * 0.3);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals((double) receivedMessages.get(), N * 0.9, N * 0.3);
+ });
} finally {
for (PulsarClient c : clients) {
c.close();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 77292f8..2eebbf8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -4125,11 +4125,14 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
* <p>
* Test starts consumer with 10 partitions where one of the partition listener gets blocked but that will not impact
* processing of other 9 partitions and they will be processed successfully.
+ * As of involved #11455(Fix Consumer listener does not respect receiver queue size),
+ * This test has changed the purpose that different thread run the messageListener. Because messageListener has to
+ * be called one by one, it's possible to run by the same one thread.
*
* @throws Exception
*/
@Test(timeOut = 20000)
- public void testPartitionTopicsOnSeparateListner() throws Exception {
+ public void testPartitionTopicsOnSeparateListener() throws Exception {
log.info("-- Starting {} test --", methodName);
final String topicName = "persistent://my-property/my-ns/one-partitioned-topic";
@@ -4145,20 +4148,12 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
// each partition
int totalMessages = partitions * 2;
- CountDownLatch latch = new CountDownLatch(totalMessages - 2);
- CountDownLatch blockedMessageLatch = new CountDownLatch(1);
+ CountDownLatch latch = new CountDownLatch(1);
AtomicInteger count = new AtomicInteger();
Set<String> listenerThreads = Sets.newConcurrentHashSet();
MessageListener<byte[]> messageListener = (c, m) -> {
- if (count.incrementAndGet() == 1) {
- try {
- // blocking one of the partition's listener thread will not impact other topics
- blockedMessageLatch.await();
- } catch (InterruptedException e) {
- // Ok
- }
- } else {
+ if (count.incrementAndGet() == totalMessages) {
latch.countDown();
}
listenerThreads.add(Thread.currentThread().getName());
@@ -4177,9 +4172,79 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
producer1.newMessage().value(("one-partitioned-topic-value-producer1-" + i).getBytes(UTF_8)).send();
}
latch.await();
- assertEquals(listenerThreads.size(), partitions - 1);
- // unblock the listener thread
- blockedMessageLatch.countDown();
+ assertTrue(listenerThreads.size() >= 1);
log.info("-- Exiting {} test --", methodName);
}
+
+ @Test(timeOut = 30000)
+ public void testShareConsumerWithMessageListener() throws Exception {
+ String topic = "testReadAheadWhenAddingConsumers-" + UUID.randomUUID();
+ int total = 200;
+ Set<Integer> resultSet = Sets.newConcurrentHashSet();
+ AtomicInteger r1 = new AtomicInteger(0);
+ AtomicInteger r2 = new AtomicInteger(0);
+
+ @Cleanup
+ Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+ .topic(topic)
+ .maxPendingMessages(500)
+ .enableBatching(false)
+ .create();
+
+ @Cleanup
+ Consumer<Integer> c1 = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName("shared")
+ .subscriptionType(SubscriptionType.Shared)
+ .receiverQueueSize(10)
+ .consumerName("c1")
+ .messageListener((MessageListener<Integer>) (consumer, msg) -> {
+ log.info("c1 received : {}", msg.getValue());
+ try {
+ resultSet.add(msg.getValue());
+ r1.incrementAndGet();
+ consumer.acknowledge(msg);
+ Thread.sleep(10);
+ } catch (InterruptedException ignore) {
+ //
+ } catch (PulsarClientException ex) {
+ log.error("c1 acknowledge error", ex);
+ }
+ })
+ .subscribe();
+
+ for (int i = 0; i < total; i++) {
+ producer.newMessage()
+ .value(i)
+ .send();
+ }
+
+ @Cleanup
+ Consumer<Integer> c2 = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName("shared")
+ .subscriptionType(SubscriptionType.Shared)
+ .receiverQueueSize(10)
+ .consumerName("c2")
+ .messageListener((MessageListener<Integer>) (consumer, msg) -> {
+ log.info("c2 received : {}", msg.getValue());
+ try {
+ resultSet.add(msg.getValue());
+ r2.incrementAndGet();
+ consumer.acknowledge(msg);
+ Thread.sleep(10);
+ } catch (InterruptedException ignore) {
+ //
+ } catch (PulsarClientException ex) {
+ log.error("c2 acknowledge error", ex);
+ }
+ })
+ .subscribe();
+
+ Awaitility.await().untilAsserted(() -> {
+ assertTrue(r1.get() >= 1);
+ assertTrue(r2.get() >= 1);
+ assertEquals(resultSet.size(), total);
+ });
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 5c02904..9e3757b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import io.netty.util.Timeout;
@@ -86,6 +87,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
protected volatile long incomingMessagesSize = 0;
protected volatile Timeout batchReceiveTimeout = null;
protected final Lock reentrantLock = new ReentrantLock();
+ private final AtomicInteger executorQueueSize = new AtomicInteger(0);
protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
int receiverQueueSize, ExecutorProvider executorProvider,
@@ -895,24 +897,26 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
protected void triggerListener() {
// Trigger the notification on the message listener in a separate thread to avoid blocking the networking
// thread while the message processing happens
- Message<T> msg;
- do {
- try {
- msg = internalReceive(0, TimeUnit.MILLISECONDS);
+ try {
+ // Control executor to call MessageListener one by one.
+ if (executorQueueSize.get() < 1) {
+ final Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS);
if (msg != null) {
- final Message<T> finalMsg = msg;
+ executorQueueSize.incrementAndGet();
if (SubscriptionType.Key_Shared == conf.getSubscriptionType()) {
- executorProvider.getExecutor(peekMessageKey(finalMsg)).execute(() ->
- callMessageListener(finalMsg));
+ executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
+ callMessageListener(msg));
} else {
- getExecutor(msg).execute(() -> callMessageListener(finalMsg));
+ getExecutor(msg).execute(() -> {
+ callMessageListener(msg);
+ });
}
}
- } catch (PulsarClientException e) {
- log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e);
- return;
}
- } while (msg != null);
+ } catch (PulsarClientException e) {
+ log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e);
+ return;
+ }
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription);
@@ -929,6 +933,9 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
} catch (Throwable t) {
log.error("[{}][{}] Message listener error in processing message: {}", topic, subscription,
msg.getMessageId(), t);
+ } finally {
+ executorQueueSize.decrementAndGet();
+ triggerListener();
}
}