You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/07 17:23:06 UTC
[pulsar] branch master updated: [java client] Bugfix prevent dup
consumers for same topic subscribe (#3746)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fb5dcd9 [java client] Bugfix prevent dup consumers for same topic subscribe (#3746)
fb5dcd9 is described below
commit fb5dcd9a58524686f2f6208d41a1e82b5bbb8111
Author: Ezequiel Lovelle <ez...@gmail.com>
AuthorDate: Thu Mar 7 14:22:59 2019 -0300
[java client] Bugfix prevent dup consumers for same topic subscribe (#3746)
Fixes #3743 issue.
Return previous instance of a consumer in the subscription processed should only
be considered with the scope of the same topic.
Modifications:
- Remove optimization of duplicated consumers for multi topics subscribe and
pattern topics subscribe, this should be handled with a different approach.
- Filter consumers for the same topic name.
- Filter consumers which are connected to broker, this is not necessary to fix
this issue but is a good thing to do.
- Add test that verifies that same subscription will allow different consumers
instance for different topics.
---
.../client/api/SimpleProducerConsumerTest.java | 51 +++++++++++++++++++++-
.../pulsar/client/impl/PulsarClientImpl.java | 13 ++----
2 files changed, 53 insertions(+), 11 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 4a142ce..0f3a5b6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -2994,7 +2994,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
// Pull 3312: https://github.com/apache/pulsar/pull/3312
// Bugfix preventing duplicated consumers on same client cnx with shared subscription mode
@Test()
- public void testPreventDupConsumersOnClientCnx() throws Exception {
+ public void testPreventDupConsumersOnClientCnxForSingleSub() throws Exception {
final CompletableFuture<Void> future = new CompletableFuture<>();
final String topic = "persistent://my-property/my-ns/my-topic";
final String subName = "my-subscription";
@@ -3024,7 +3024,56 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
});
future.get(5, TimeUnit.SECONDS);
+ Assert.assertEquals(consumer, consumerB);
Assert.assertTrue(future.isDone());
Assert.assertFalse(future.isCompletedExceptionally());
}
+
+ @Test()
+ public void testPreventDupConsumersOnClientCnxForSingleSub_AllowDifferentTopics() throws Exception {
+ final CompletableFuture<Void> future = new CompletableFuture<>();
+ final String topic = "persistent://my-property/my-ns/my-topic";
+ final String subName = "my-subscription";
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+ Consumer<byte[]> consumerB = pulsarClient.newConsumer().topic(topic)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+
+ // This consumer should be a newly subscription since is it from a different topic
+ // even though has the same subscription name.
+ Consumer<byte[]> consumerC = pulsarClient.newConsumer().topic(topic + "-different-topic")
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+
+ consumer.unsubscribeAsync().whenComplete((aVoid1, t1) -> {
+ if (t1 != null) {
+ future.completeExceptionally(t1);
+ return;
+ }
+
+ consumer.closeAsync().whenComplete((aVoid2, t2) -> {
+ if (t2 != null) {
+ future.completeExceptionally(t2);
+ return;
+ }
+ future.complete(null);
+ });
+ });
+
+ future.get(5, TimeUnit.SECONDS);
+ Assert.assertEquals(consumer, consumerB);
+ Assert.assertTrue(future.isDone());
+ Assert.assertFalse(future.isCompletedExceptionally());
+
+ // consumerC is a newly created subscription.
+ Assert.assertNotEquals(consumer, consumerC);
+ Assert.assertTrue(consumerC.isConnected());
+ consumerC.close();
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 039f318..257e627 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -350,11 +350,6 @@ public class PulsarClientImpl implements PulsarClient {
}
private <T> CompletableFuture<Consumer<T>> multiTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
- Optional<ConsumerBase<T>> subscriber = subscriptionExist(conf);
- if (subscriber.isPresent()) {
- return CompletableFuture.completedFuture(subscriber.get());
- }
-
CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>();
ConsumerBase<T> consumer = new MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf,
@@ -377,10 +372,6 @@ public class PulsarClientImpl implements PulsarClient {
Mode subscriptionMode = convertRegexSubscriptionMode(conf.getRegexSubscriptionMode());
TopicName destination = TopicName.get(regex);
NamespaceName namespaceName = destination.getNamespaceObject();
- Optional<ConsumerBase<T>> subscriber = subscriptionExist(conf);
- if (subscriber.isPresent()) {
- return CompletableFuture.completedFuture(subscriber.get());
- }
CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>();
lookup.getTopicsUnderNamespace(namespaceName, subscriptionMode)
@@ -688,8 +679,10 @@ public class PulsarClientImpl implements PulsarClient {
private <T> Optional<ConsumerBase<T>> subscriptionExist(ConsumerConfigurationData<?> conf) {
synchronized (consumers) {
Optional<ConsumerBase<?>> subscriber = consumers.keySet().stream()
- .filter(consumerBase -> consumerBase.getSubType().equals(PulsarApi.CommandSubscribe.SubType.Shared))
+ .filter(c -> c.getSubType().equals(PulsarApi.CommandSubscribe.SubType.Shared))
+ .filter(c -> conf.getTopicNames().contains(c.getTopic()))
.filter(c -> c.getSubscription().equals(conf.getSubscriptionName()))
+ .filter(Consumer::isConnected)
.findFirst();
return subscriber.map(ConsumerBase.class::cast);
}