You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/09 08:00:57 UTC

[pulsar] 05/09: Reduce redundant FLOW requests for non-durable multi-topics consumer (#11802)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0669247047f8050e6dae83d9f608f3a5822520b3
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Sep 2 13:09:12 2021 +0800

    Reduce redundant FLOW requests for non-durable multi-topics consumer (#11802)
    
    ### Motivation
    
    https://github.com/apache/pulsar/pull/3960 fixed the bug that reader will get stuck if it's reading a partition of a partitioned topic. The fix is using `isDurable` to check whether the consumer is a reader's internal consumer because it used `partitionIndex` to check whether the target topic is a partition while reader's `partitionIndex` is already set. However, for a non-durable multi-topics consumer, `isDurable` is false and each internal consumer will send FLOW request once the co [...]
    
    After https://github.com/apache/pulsar/pull/4591 introduced `hasParentConsumer` field, the check works for even a reader without the `isDurable` check.
    
    ### Modifications
    
    - Remove the check for `isDurable` before sending FLOW request and update the related comment.
    - Add a test for non-durable multi-topics consumer to verify the number of FLOW requests is the topics number, not the twice the topics number.
    
    ### Verifying this change
    
    - [x] Make sure that the change passes the CI checks.
    
    This change added `NonDurableSubscriptionTest#testFlowCountForMultiTopics` and the existing test `ReaderTest#testReadFromPartition` added in #3960 can also pass after this change.
    This change added tests and can be verified as follows:
    
    (cherry picked from commit 1303e7dc7a70f10c5da015804184caecd218e7e3)
---
 .../client/api/NonDurableSubscriptionTest.java     | 57 +++++++++++++++++++++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  4 +-
 2 files changed, 57 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
index 09f6f39..bef6274 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
@@ -21,18 +21,25 @@ package org.apache.pulsar.client.api;
 import java.lang.reflect.Field;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
-
+import java.util.concurrent.atomic.AtomicInteger;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.PulsarChannelInitializer;
+import org.apache.pulsar.broker.service.ServerCnx;
 import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSubscription;
 import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
 import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.common.api.proto.CommandFlow;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.AssertJUnit.assertFalse;
 import static org.testng.AssertJUnit.assertNull;
@@ -42,6 +49,8 @@ import static org.testng.AssertJUnit.assertTrue;
 @Slf4j
 public class NonDurableSubscriptionTest  extends ProducerConsumerBase {
 
+    private final AtomicInteger numFlow = new AtomicInteger(0);
+
     @BeforeMethod
     @Override
     protected void setup() throws Exception {
@@ -56,6 +65,34 @@ public class NonDurableSubscriptionTest  extends ProducerConsumerBase {
         super.internalCleanup();
     }
 
+    @Override
+    protected PulsarService newPulsarService(ServiceConfiguration conf) throws Exception {
+        return new PulsarService(conf) {
+
+            @Override
+            protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
+                BrokerService broker = new BrokerService(this, ioEventLoopGroup);
+                broker.setPulsarChannelInitializerFactory(
+                        (_pulsar, tls) -> {
+                            return new PulsarChannelInitializer(_pulsar, tls) {
+                                @Override
+                                protected ServerCnx newServerCnx(PulsarService pulsar) throws Exception {
+                                    return new ServerCnx(pulsar) {
+
+                                        @Override
+                                        protected void handleFlow(CommandFlow flow) {
+                                            super.handleFlow(flow);
+                                            numFlow.incrementAndGet();
+                                        }
+                                    };
+                                }
+                            };
+                        });
+                return broker;
+            }
+        };
+    }
+
     @Test
     public void testNonDurableSubscription() throws Exception {
         String topicName = "persistent://my-property/my-ns/nonDurable-topic1";
@@ -188,4 +225,22 @@ public class NonDurableSubscriptionTest  extends ProducerConsumerBase {
         }
 
     }
+
+    @Test
+    public void testFlowCountForMultiTopics() throws Exception {
+        String topicName = "persistent://my-property/my-ns/test-flow-count";
+        int numPartitions = 5;
+        admin.topics().createPartitionedTopic(topicName, numPartitions);
+        numFlow.set(0);
+
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName("my-nonDurable-subscriber")
+                .subscriptionMode(SubscriptionMode.NonDurable)
+                .subscribe();
+        consumer.receive(1, TimeUnit.SECONDS);
+        consumer.close();
+
+        assertEquals(numFlow.get(), numPartitions);
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 37856aa..b7fa039 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -755,9 +755,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             boolean firstTimeConnect = subscribeFuture.complete(this);
             // if the consumer is not partitioned or is re-connected and is partitioned, we send the flow
             // command to receive messages.
-            // For readers too (isDurable==false), the partition idx will be set though we have to
-            // send available permits immediately after establishing the reader session
-            if (!(firstTimeConnect && hasParentConsumer && isDurable) && conf.getReceiverQueueSize() != 0) {
+            if (!(firstTimeConnect && hasParentConsumer) && conf.getReceiverQueueSize() != 0) {
                 increaseAvailablePermits(cnx, conf.getReceiverQueueSize());
             }
         }).exceptionally((e) -> {