You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/07 17:23:06 UTC

[pulsar] branch master updated: [java client] Bugfix prevent dup consumers for same topic subscribe (#3746)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fb5dcd9  [java client] Bugfix prevent dup consumers for same topic subscribe (#3746)
fb5dcd9 is described below

commit fb5dcd9a58524686f2f6208d41a1e82b5bbb8111
Author: Ezequiel Lovelle <ez...@gmail.com>
AuthorDate: Thu Mar 7 14:22:59 2019 -0300

    [java client] Bugfix prevent dup consumers for same topic subscribe (#3746)
    
    Fixes #3743 issue.
    
    Return previous instance of a consumer in the subscription processed should only
    be considered with the scope of the same topic.
    
    Modifications:
    
      - Remove optimization of duplicated consumers for multi topics subscribe and
        pattern topics subscribe, this should be handled with a different approach.
      - Filter consumers for the same topic name.
      - Filter consumers which are connected to broker, this is not necessary to fix
        this issue but is a good thing to do.
      - Add test that verifies that same subscription will allow different consumers
        instance for different topics.
---
 .../client/api/SimpleProducerConsumerTest.java     | 51 +++++++++++++++++++++-
 .../pulsar/client/impl/PulsarClientImpl.java       | 13 ++----
 2 files changed, 53 insertions(+), 11 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 4a142ce..0f3a5b6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -2994,7 +2994,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
     // Pull 3312: https://github.com/apache/pulsar/pull/3312
     // Bugfix preventing duplicated consumers on same client cnx with shared subscription mode
     @Test()
-    public void testPreventDupConsumersOnClientCnx() throws Exception {
+    public void testPreventDupConsumersOnClientCnxForSingleSub() throws Exception {
         final CompletableFuture<Void> future = new CompletableFuture<>();
         final String topic = "persistent://my-property/my-ns/my-topic";
         final String subName = "my-subscription";
@@ -3024,7 +3024,56 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         });
 
         future.get(5, TimeUnit.SECONDS);
+        Assert.assertEquals(consumer, consumerB);
         Assert.assertTrue(future.isDone());
         Assert.assertFalse(future.isCompletedExceptionally());
     }
+
+    @Test()
+    public void testPreventDupConsumersOnClientCnxForSingleSub_AllowDifferentTopics() throws Exception {
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+        final String topic = "persistent://my-property/my-ns/my-topic";
+        final String subName = "my-subscription";
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+        Consumer<byte[]> consumerB = pulsarClient.newConsumer().topic(topic)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        // This consumer should be a newly subscription since is it from a different topic
+        // even though has the same subscription name.
+        Consumer<byte[]> consumerC = pulsarClient.newConsumer().topic(topic + "-different-topic")
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        consumer.unsubscribeAsync().whenComplete((aVoid1, t1) -> {
+            if (t1 != null) {
+                future.completeExceptionally(t1);
+                return;
+            }
+
+            consumer.closeAsync().whenComplete((aVoid2, t2) -> {
+                if (t2 != null) {
+                    future.completeExceptionally(t2);
+                    return;
+                }
+                future.complete(null);
+            });
+        });
+
+        future.get(5, TimeUnit.SECONDS);
+        Assert.assertEquals(consumer, consumerB);
+        Assert.assertTrue(future.isDone());
+        Assert.assertFalse(future.isCompletedExceptionally());
+
+        // consumerC is a newly created subscription.
+        Assert.assertNotEquals(consumer, consumerC);
+        Assert.assertTrue(consumerC.isConnected());
+        consumerC.close();
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 039f318..257e627 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -350,11 +350,6 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     private <T> CompletableFuture<Consumer<T>> multiTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
-        Optional<ConsumerBase<T>> subscriber = subscriptionExist(conf);
-        if (subscriber.isPresent()) {
-            return CompletableFuture.completedFuture(subscriber.get());
-        }
-
         CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>();
 
         ConsumerBase<T> consumer = new MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf,
@@ -377,10 +372,6 @@ public class PulsarClientImpl implements PulsarClient {
         Mode subscriptionMode = convertRegexSubscriptionMode(conf.getRegexSubscriptionMode());
         TopicName destination = TopicName.get(regex);
         NamespaceName namespaceName = destination.getNamespaceObject();
-        Optional<ConsumerBase<T>> subscriber = subscriptionExist(conf);
-        if (subscriber.isPresent()) {
-            return CompletableFuture.completedFuture(subscriber.get());
-        }
 
         CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>();
         lookup.getTopicsUnderNamespace(namespaceName, subscriptionMode)
@@ -688,8 +679,10 @@ public class PulsarClientImpl implements PulsarClient {
     private <T> Optional<ConsumerBase<T>> subscriptionExist(ConsumerConfigurationData<?> conf) {
         synchronized (consumers) {
             Optional<ConsumerBase<?>> subscriber = consumers.keySet().stream()
-                    .filter(consumerBase -> consumerBase.getSubType().equals(PulsarApi.CommandSubscribe.SubType.Shared))
+                    .filter(c -> c.getSubType().equals(PulsarApi.CommandSubscribe.SubType.Shared))
+                    .filter(c -> conf.getTopicNames().contains(c.getTopic()))
                     .filter(c -> c.getSubscription().equals(conf.getSubscriptionName()))
+                    .filter(Consumer::isConnected)
                     .findFirst();
             return subscriber.map(ConsumerBase.class::cast);
         }