You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/29 09:53:07 UTC
[pulsar] 01/10: Consumer is registered on dispatcher even if hash
range conflicts on Key_Shared subscription (#7444)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fc1a8ee4c829f663091b3d0daf91b14661539cf5
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Tue Jul 7 01:25:07 2020 +0900
Consumer is registered on dispatcher even if hash range conflicts on Key_Shared subscription (#7444)
(cherry picked from commit 97ee82ed2dc337f81d7059c5d8980191d16dbfe3)
---
.../broker/service/AbstractDispatcherSingleActiveConsumer.java | 4 ++--
.../NonPersistentStickyKeyDispatcherMultipleConsumers.java | 10 ++++++++--
.../PersistentStickyKeyDispatcherMultipleConsumers.java | 10 ++++++++--
.../apache/pulsar/client/api/KeySharedSubscriptionTest.java | 10 +++++++++-
4 files changed, 27 insertions(+), 7 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 6c5f8a7..9948dcc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -155,8 +155,6 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
throw new ConsumerBusyException("Subscription reached max consumers limit");
}
- consumers.add(consumer);
-
if (subscriptionType == SubType.Exclusive
&& consumer.getKeySharedMeta() != null
&& consumer.getKeySharedMeta().getHashRangesList() != null
@@ -168,6 +166,8 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
isKeyHashRangeFiltered = false;
}
+ consumers.add(consumer);
+
if (!pickAndScheduleActiveConsumer()) {
// the active consumer is not changed
Consumer currentActiveConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index 32cce87..37b29da 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -47,7 +47,13 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
@Override
public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
super.addConsumer(consumer);
- selector.addConsumer(consumer);
+ try {
+ selector.addConsumer(consumer);
+ } catch (BrokerServiceException e) {
+ consumerSet.removeAll(consumer);
+ consumerList.remove(consumer);
+ throw e;
+ }
}
@Override
@@ -99,4 +105,4 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages());
}
}
-}
\ No newline at end of file
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index a4a532f..420552c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -93,7 +93,13 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
@Override
public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
super.addConsumer(consumer);
- selector.addConsumer(consumer);
+ try {
+ selector.addConsumer(consumer);
+ } catch (BrokerServiceException e) {
+ consumerSet.removeAll(consumer);
+ consumerList.remove(consumer);
+ throw e;
+ }
// If this was the 1st consumer, or if all the messages are already acked, then we
// don't need to do anything special
@@ -294,4 +300,4 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);
-}
\ No newline at end of file
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 610c4d1..02073ea 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -658,7 +658,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
@Test
public void testHashRangeConflict() throws PulsarClientException {
this.conf.setSubscriptionKeySharedEnable(true);
- final String topic = "testHashRangeConflict-" + UUID.randomUUID().toString();
+ final String topic = "persistent://public/default/testHashRangeConflict-" + UUID.randomUUID().toString();
final String sub = "test";
Consumer<String> consumer1 = createFixedHashRangesConsumer(topic, sub, Range.of(0,99), Range.of(400, 65535));
@@ -667,6 +667,10 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
Consumer<String> consumer2 = createFixedHashRangesConsumer(topic, sub, Range.of(100,399));
Assert.assertTrue(consumer2.isConnected());
+ PersistentStickyKeyDispatcherMultipleConsumers dispatcher = (PersistentStickyKeyDispatcherMultipleConsumers) pulsar
+ .getBrokerService().getTopicReference(topic).get().getSubscription(sub).getDispatcher();
+ Assert.assertEquals(dispatcher.getConsumers().size(), 2);
+
try {
createFixedHashRangesConsumer(topic, sub, Range.of(0, 65535));
Assert.fail("Should failed with conflict range.");
@@ -679,7 +683,9 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
} catch (PulsarClientException.ConsumerAssignException ignore) {
}
+ Assert.assertEquals(dispatcher.getConsumers().size(), 2);
consumer1.close();
+ Assert.assertEquals(dispatcher.getConsumers().size(), 1);
try {
createFixedHashRangesConsumer(topic, sub, Range.of(0, 65535));
@@ -705,9 +711,11 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
Consumer<String> consumer4 = createFixedHashRangesConsumer(topic, sub, Range.of(50,99));
Assert.assertTrue(consumer4.isConnected());
+ Assert.assertEquals(dispatcher.getConsumers().size(), 3);
consumer2.close();
consumer3.close();
consumer4.close();
+ Assert.assertFalse(dispatcher.isConsumerConnected());
}
@Test