You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/09 08:00:57 UTC
[pulsar] 05/09: Reduce redundant FLOW requests for non-durable
multi-topics consumer (#11802)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0669247047f8050e6dae83d9f608f3a5822520b3
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Sep 2 13:09:12 2021 +0800
Reduce redundant FLOW requests for non-durable multi-topics consumer (#11802)
### Motivation
https://github.com/apache/pulsar/pull/3960 fixed the bug that reader will get stuck if it's reading a partition of a partitioned topic. The fix is using `isDurable` to check whether the consumer is a reader's internal consumer because it used `partitionIndex` to check whether the target topic is a partition while reader's `partitionIndex` is already set. However, for a non-durable multi-topics consumer, `isDurable` is false and each internal consumer will send FLOW request once the co [...]
After https://github.com/apache/pulsar/pull/4591 introduced `hasParentConsumer` field, the check works for even a reader without the `isDurable` check.
### Modifications
- Remove the check for `isDurable` before sending FLOW request and update the related comment.
- Add a test for non-durable multi-topics consumer to verify the number of FLOW requests is the topics number, not the twice the topics number.
### Verifying this change
- [x] Make sure that the change passes the CI checks.
This change added `NonDurableSubscriptionTest#testFlowCountForMultiTopics` and the existing test `ReaderTest#testReadFromPartition` added in #3960 can also pass after this change.
This change added tests and can be verified as follows:
(cherry picked from commit 1303e7dc7a70f10c5da015804184caecd218e7e3)
---
.../client/api/NonDurableSubscriptionTest.java | 57 +++++++++++++++++++++-
.../apache/pulsar/client/impl/ConsumerImpl.java | 4 +-
2 files changed, 57 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
index 09f6f39..bef6274 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
@@ -21,18 +21,25 @@ package org.apache.pulsar.client.api;
import java.lang.reflect.Field;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-
+import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.PulsarChannelInitializer;
+import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSubscription;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.common.api.proto.CommandFlow;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertNull;
@@ -42,6 +49,8 @@ import static org.testng.AssertJUnit.assertTrue;
@Slf4j
public class NonDurableSubscriptionTest extends ProducerConsumerBase {
+ private final AtomicInteger numFlow = new AtomicInteger(0);
+
@BeforeMethod
@Override
protected void setup() throws Exception {
@@ -56,6 +65,34 @@ public class NonDurableSubscriptionTest extends ProducerConsumerBase {
super.internalCleanup();
}
+ @Override
+ protected PulsarService newPulsarService(ServiceConfiguration conf) throws Exception {
+ return new PulsarService(conf) {
+
+ @Override
+ protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
+ BrokerService broker = new BrokerService(this, ioEventLoopGroup);
+ broker.setPulsarChannelInitializerFactory(
+ (_pulsar, tls) -> {
+ return new PulsarChannelInitializer(_pulsar, tls) {
+ @Override
+ protected ServerCnx newServerCnx(PulsarService pulsar) throws Exception {
+ return new ServerCnx(pulsar) {
+
+ @Override
+ protected void handleFlow(CommandFlow flow) {
+ super.handleFlow(flow);
+ numFlow.incrementAndGet();
+ }
+ };
+ }
+ };
+ });
+ return broker;
+ }
+ };
+ }
+
@Test
public void testNonDurableSubscription() throws Exception {
String topicName = "persistent://my-property/my-ns/nonDurable-topic1";
@@ -188,4 +225,22 @@ public class NonDurableSubscriptionTest extends ProducerConsumerBase {
}
}
+
+ @Test
+ public void testFlowCountForMultiTopics() throws Exception {
+ String topicName = "persistent://my-property/my-ns/test-flow-count";
+ int numPartitions = 5;
+ admin.topics().createPartitionedTopic(topicName, numPartitions);
+ numFlow.set(0);
+
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("my-nonDurable-subscriber")
+ .subscriptionMode(SubscriptionMode.NonDurable)
+ .subscribe();
+ consumer.receive(1, TimeUnit.SECONDS);
+ consumer.close();
+
+ assertEquals(numFlow.get(), numPartitions);
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 37856aa..b7fa039 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -755,9 +755,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
boolean firstTimeConnect = subscribeFuture.complete(this);
// if the consumer is not partitioned or is re-connected and is partitioned, we send the flow
// command to receive messages.
- // For readers too (isDurable==false), the partition idx will be set though we have to
- // send available permits immediately after establishing the reader session
- if (!(firstTimeConnect && hasParentConsumer && isDurable) && conf.getReceiverQueueSize() != 0) {
+ if (!(firstTimeConnect && hasParentConsumer) && conf.getReceiverQueueSize() != 0) {
increaseAvailablePermits(cnx, conf.getReceiverQueueSize());
}
}).exceptionally((e) -> {