You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/07/06 16:26:08 UTC

[pulsar] branch master updated: Consumer is registered on dispatcher even if hash range conflicts on Key_Shared subscription (#7444)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 97ee82e  Consumer is registered on dispatcher even if hash range conflicts on Key_Shared subscription (#7444)
97ee82e is described below

commit 97ee82ed2dc337f81d7059c5d8980191d16dbfe3
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Tue Jul 7 01:25:07 2020 +0900

    Consumer is registered on dispatcher even if hash range conflicts on Key_Shared subscription (#7444)
---
 .../broker/service/AbstractDispatcherSingleActiveConsumer.java |  4 ++--
 .../NonPersistentStickyKeyDispatcherMultipleConsumers.java     | 10 ++++++++--
 .../PersistentStickyKeyDispatcherMultipleConsumers.java        | 10 ++++++++--
 .../apache/pulsar/client/api/KeySharedSubscriptionTest.java    | 10 +++++++++-
 4 files changed, 27 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 6c5f8a7..9948dcc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -155,8 +155,6 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
             throw new ConsumerBusyException("Subscription reached max consumers limit");
         }
 
-        consumers.add(consumer);
-
         if (subscriptionType == SubType.Exclusive
                 && consumer.getKeySharedMeta() != null
                 && consumer.getKeySharedMeta().getHashRangesList() != null
@@ -168,6 +166,8 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
             isKeyHashRangeFiltered = false;
         }
 
+        consumers.add(consumer);
+
         if (!pickAndScheduleActiveConsumer()) {
             // the active consumer is not changed
             Consumer currentActiveConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index 32cce87..37b29da 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -47,7 +47,13 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
     @Override
     public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
         super.addConsumer(consumer);
-        selector.addConsumer(consumer);
+        try {
+            selector.addConsumer(consumer);
+        } catch (BrokerServiceException e) {
+            consumerSet.removeAll(consumer);
+            consumerList.remove(consumer);
+            throw e;
+        }
     }
 
     @Override
@@ -99,4 +105,4 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
             TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages());
         }
     }
-}
\ No newline at end of file
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index a4a532f..420552c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -93,7 +93,13 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
     @Override
     public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
         super.addConsumer(consumer);
-        selector.addConsumer(consumer);
+        try {
+            selector.addConsumer(consumer);
+        } catch (BrokerServiceException e) {
+            consumerSet.removeAll(consumer);
+            consumerList.remove(consumer);
+            throw e;
+        }
 
         // If this was the 1st consumer, or if all the messages are already acked, then we
         // don't need to do anything special
@@ -294,4 +300,4 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
 
     private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);
 
-}
\ No newline at end of file
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 511f8c2..966dfda 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -658,7 +658,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
     @Test
     public void testHashRangeConflict() throws PulsarClientException {
         this.conf.setSubscriptionKeySharedEnable(true);
-        final String topic = "testHashRangeConflict-" + UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/testHashRangeConflict-" + UUID.randomUUID().toString();
         final String sub = "test";
 
         Consumer<String> consumer1 = createFixedHashRangesConsumer(topic, sub, Range.of(0,99), Range.of(400, 65535));
@@ -667,6 +667,10 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         Consumer<String> consumer2 = createFixedHashRangesConsumer(topic, sub, Range.of(100,399));
         Assert.assertTrue(consumer2.isConnected());
 
+        PersistentStickyKeyDispatcherMultipleConsumers dispatcher = (PersistentStickyKeyDispatcherMultipleConsumers) pulsar
+                .getBrokerService().getTopicReference(topic).get().getSubscription(sub).getDispatcher();
+        Assert.assertEquals(dispatcher.getConsumers().size(), 2);
+
         try {
             createFixedHashRangesConsumer(topic, sub, Range.of(0, 65535));
             Assert.fail("Should failed with conflict range.");
@@ -679,7 +683,9 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         } catch (PulsarClientException.ConsumerAssignException ignore) {
         }
 
+        Assert.assertEquals(dispatcher.getConsumers().size(), 2);
         consumer1.close();
+        Assert.assertEquals(dispatcher.getConsumers().size(), 1);
 
         try {
             createFixedHashRangesConsumer(topic, sub, Range.of(0, 65535));
@@ -705,9 +711,11 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         Consumer<String> consumer4 = createFixedHashRangesConsumer(topic, sub, Range.of(50,99));
         Assert.assertTrue(consumer4.isConnected());
 
+        Assert.assertEquals(dispatcher.getConsumers().size(), 3);
         consumer2.close();
         consumer3.close();
         consumer4.close();
+        Assert.assertFalse(dispatcher.isConsumerConnected());
     }
 
     private Consumer<String> createFixedHashRangesConsumer(String topic, String subscription, Range... ranges) throws PulsarClientException {