You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/01/27 08:24:58 UTC

[pulsar] branch master updated: Prevent dup consumers on same client cnx with shared subscription (#3312)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 231db03  Prevent dup consumers on same client cnx with shared subscription (#3312)
231db03 is described below

commit 231db030b9529737237721059b2a5b3044d4cab1
Author: Ezequiel Lovelle <ez...@gmail.com>
AuthorDate: Sun Jan 27 05:24:53 2019 -0300

    Prevent dup consumers on same client cnx with shared subscription (#3312)
    
    * Prevent dup consumers on same client cnx with shared subscription
    
    Providing help trying to fix issue #3226. Bug description:
    
    When a client attempts to setup more than one consumer subscription on shared
    mode with the same subscription name, due to the validation at broker level of
    `consumerList.size() == 1` on canUnsubscribe() method, broker will throw an
    exception at the moment the client tries to unsubscribe the consumer.
    
    In order to prevent this, the proposed solution (probably not the best one) is
    to detect when the user is trying to setup an already subscribed consumer and
    return this exact same consumer instance.
    
    I believe that is quite strange to have two or more consumers with shared mode
    on the same connection for the same subscription, this might be due to the user
    confusing about consumer shared mode or behaviour, another good solution to
    prevent this from happening might be just throwing an invalid configuration
    exception.
    
    * Adapt tests for bugfix introduced in commit 44e1a23
    
    * Add test case exploiting issue 3326 described in commit 44e1a23
---
 .../broker/auth/MockedPulsarServiceBaseTest.java   |   9 +-
 .../broker/service/PersistentQueueE2ETest.java     |  34 +++---
 .../broker/service/PersistentTopicE2ETest.java     |   6 +
 .../pulsar/broker/service/ResendRequestTest.java   |  29 +++--
 .../pulsar/client/api/DeadLetterTopicTest.java     |  14 ++-
 .../client/api/DispatcherBlockConsumerTest.java    |   8 +-
 .../api/PartitionedProducerConsumerTest.java       |  14 +++
 .../client/api/SimpleProducerConsumerTest.java     | 132 +++++++++++++++++----
 .../client/api/v1/V1_ProducerConsumerTest.java     |  77 ++++++++----
 .../client/impl/BrokerClientIntegrationTest.java   |   6 +-
 .../pulsar/client/impl/PulsarClientImpl.java       |  32 +++--
 .../org/apache/pulsar/storm/PulsarSpoutTest.java   |   4 +-
 12 files changed, 272 insertions(+), 93 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 0720d3e..99e7ae4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
 import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
@@ -111,7 +112,11 @@ public abstract class MockedPulsarServiceBaseTest {
         if (isTcpLookup) {
             lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT);
         }
-        pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).statsInterval(0, TimeUnit.SECONDS).build();
+        pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
+    }
+
+    protected PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException {
+        return PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS).build();
     }
 
     protected final void internalSetupForStatsTest() throws Exception {
@@ -120,7 +125,7 @@ public abstract class MockedPulsarServiceBaseTest {
         if (isTcpLookup) {
             lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
         }
-        pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).statsInterval(1, TimeUnit.SECONDS).build();
+        pulsarClient = newPulsarClient(lookupUrl, 1);
     }
 
     protected final void init() throws Exception {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
index fb7a76c..e0ae197 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
@@ -36,14 +36,9 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
@@ -91,12 +86,13 @@ public class PersistentQueueE2ETest extends BrokerTestBase {
         final String subName = "sub1";
         final int numMsgs = 100;
 
-        ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
-                .subscriptionType(SubscriptionType.Shared);
-
         // 1. two consumers on the same subscription
-        Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
-        Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Shared).subscribe();
+
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer2 = newPulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Shared).subscribe();
 
         PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
         PersistentSubscription subRef = topicRef.getSubscription(subName);
@@ -180,6 +176,7 @@ public class PersistentQueueE2ETest extends BrokerTestBase {
 
         producer.close();
         consumer2.close();
+        newPulsarClient.close();
 
         deleteTopic(topicName);
     }
@@ -204,7 +201,8 @@ public class PersistentQueueE2ETest extends BrokerTestBase {
                 }).subscribe();
 
         // consumer2 does not ack messages
-        Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer2 = newPulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
                 .subscriptionType(SubscriptionType.Shared).messageListener((consumer, msg) -> {
                     // do notthing
                 }).subscribe();
@@ -229,6 +227,7 @@ public class PersistentQueueE2ETest extends BrokerTestBase {
         assertTrue(CollectionUtils.subtract(messagesProduced, messagesConsumed).isEmpty());
 
         consumer1.close();
+        newPulsarClient.close();
 
         deleteTopic(topicName);
     }
@@ -322,10 +321,14 @@ public class PersistentQueueE2ETest extends BrokerTestBase {
         assertEquals(topicRef.getProducers().size(), 1);
 
         // 2. Create consumer
-        ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName)
+        ConsumerBuilder<byte[]> consumerBuilder1 = pulsarClient.newConsumer().topic(topicName)
                 .subscriptionName(subscriptionName).receiverQueueSize(10).subscriptionType(SubscriptionType.Shared);
-        Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
-        Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
+        Consumer<byte[]> consumer1 = consumerBuilder1.subscribe();
+
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        ConsumerBuilder<byte[]> consumerBuilder2 = newPulsarClient.newConsumer().topic(topicName)
+                .subscriptionName(subscriptionName).receiverQueueSize(10).subscriptionType(SubscriptionType.Shared);
+        Consumer<byte[]> consumer2 = consumerBuilder2.subscribe();
 
         // 3. Producer publishes messages
         for (int i = 0; i < totalMessages; i++) {
@@ -369,6 +372,7 @@ public class PersistentQueueE2ETest extends BrokerTestBase {
             receivedConsumer2 += 1;
         }
 
+        newPulsarClient.close();
         log.info("Total receives by Consumer 2 = " + receivedConsumer2);
         assertEquals(receivedConsumer2, totalMessages);
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 2286862..38fec89 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -797,6 +797,7 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
 
         // 1. shared consumer on an exclusive sub fails
         try {
+            PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
             consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
                     .subscriptionType(SubscriptionType.Shared).subscribe();
             fail("should have failed");
@@ -806,6 +807,7 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
 
         // 2. failover consumer on an exclusive sub fails
         try {
+            PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
             consumer3 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
                     .subscriptionType(SubscriptionType.Failover).subscribe();
             fail("should have failed");
@@ -816,6 +818,7 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
         // 3. disconnected sub can be converted in shared
         consumer1.close();
         try {
+            PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
             consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
                     .subscriptionType(SubscriptionType.Shared).subscribe();
             assertEquals(subRef.getDispatcher().getType(), SubType.Shared);
@@ -825,6 +828,7 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
 
         // 4. exclusive fails on shared sub
         try {
+            PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
             consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
                     .subscriptionType(SubscriptionType.Exclusive).subscribe();
             fail("should have failed");
@@ -835,6 +839,7 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
         // 5. disconnected sub can be converted in failover
         consumer2.close();
         try {
+            PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
             consumer3 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
                     .subscriptionType(SubscriptionType.Failover).subscribe();
             assertEquals(subRef.getDispatcher().getType(), SubType.Failover);
@@ -845,6 +850,7 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
         // 5. exclusive consumer can connect after failover disconnects
         consumer3.close();
         try {
+            PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
             consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
                     .subscriptionType(SubscriptionType.Exclusive).subscribe();
             assertEquals(subRef.getDispatcher().getType(), SubType.Exclusive);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
index eecf0f2..049daba 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
@@ -29,13 +29,9 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerBase;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.slf4j.Logger;
@@ -171,11 +167,14 @@ public class ResendRequestTest extends BrokerTestBase {
         assertEquals(topicRef.getProducers().size(), 1);
 
         // 2. Create consumer
-        ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName)
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName)
+                .subscriptionName(subscriptionName).receiverQueueSize(totalMessages / 2)
+                .subscriptionType(SubscriptionType.Shared).subscribe();
+
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer2 = newPulsarClient.newConsumer().topic(topicName)
                 .subscriptionName(subscriptionName).receiverQueueSize(totalMessages / 2)
-                .subscriptionType(SubscriptionType.Shared);
-        Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
-        Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
+                .subscriptionType(SubscriptionType.Shared).subscribe();
 
         // 3. Producer publishes messages
         for (int i = 0; i < totalMessages; i++) {
@@ -232,6 +231,7 @@ public class ResendRequestTest extends BrokerTestBase {
             message2 = consumer2.receive(200, TimeUnit.MILLISECONDS);
         } while (message1 != null || message2 != null);
         log.info("Additional received = " + receivedMessagesAfterRedelivery);
+        newPulsarClient.close();
         assertTrue(receivedMessagesAfterRedelivery > 0);
 
         assertEquals(receivedConsumer1 + receivedConsumer2, totalMessages);
@@ -486,10 +486,12 @@ public class ResendRequestTest extends BrokerTestBase {
             .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
 
         // 2. Create consumer
-        ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName)
-                .subscriptionName(subscriptionName).receiverQueueSize(7).subscriptionType(SubscriptionType.Shared);
-        Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
-        Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+                .receiverQueueSize(7).subscriptionType(SubscriptionType.Shared).subscribe();
+
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer2 = newPulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+                .receiverQueueSize(7).subscriptionType(SubscriptionType.Shared).subscribe();
 
         // 3. producer publish messages
         for (int i = 0; i < totalMessages; i++) {
@@ -561,6 +563,7 @@ public class ResendRequestTest extends BrokerTestBase {
         log.info(key + " messageCount2 = " + messageCount2);
         log.info(key + " ackCount1 = " + ackCount1);
         log.info(key + " ackCount2 = " + ackCount2);
+        newPulsarClient.close();
         assertEquals(messageCount1 + messageCount2 + ackCount1, totalMessages);
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index 82df18c..22177ba 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -64,7 +64,8 @@ public class DeadLetterTopicTest extends ProducerConsumerBase {
                 .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
 
-        Consumer<byte[]> deadLetterConsumer = pulsarClient.newConsumer(Schema.BYTES)
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
                 .topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
                 .subscriptionName("my-subscription")
                 .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
@@ -98,7 +99,7 @@ public class DeadLetterTopicTest extends ProducerConsumerBase {
         deadLetterConsumer.close();
         consumer.close();
 
-        Consumer<byte[]> checkConsumer = pulsarClient.newConsumer(Schema.BYTES)
+        Consumer<byte[]> checkConsumer = this.pulsarClient.newConsumer(Schema.BYTES)
                 .topic(topic)
                 .subscriptionName("my-subscription")
                 .subscriptionType(SubscriptionType.Shared)
@@ -112,6 +113,7 @@ public class DeadLetterTopicTest extends ProducerConsumerBase {
         assertNull(checkMessage);
 
         checkConsumer.close();
+        newPulsarClient.close();
     }
 
     /**
@@ -216,7 +218,8 @@ public class DeadLetterTopicTest extends ProducerConsumerBase {
                         .build())
                 .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
-        Consumer<byte[]> deadLetterConsumer = pulsarClient.newConsumer(Schema.BYTES)
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
                 .topic("persistent://my-property/my-ns/dead-letter-custom-topic-my-subscription-custom-DLQ")
                 .subscriptionName("my-subscription")
                 .subscribe();
@@ -244,7 +247,8 @@ public class DeadLetterTopicTest extends ProducerConsumerBase {
         } while (totalInDeadLetter < sendMessages);
         deadLetterConsumer.close();
         consumer.close();
-        Consumer<byte[]> checkConsumer = pulsarClient.newConsumer(Schema.BYTES)
+        PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> checkConsumer = newPulsarClient1.newConsumer(Schema.BYTES)
                 .topic(topic)
                 .subscriptionName("my-subscription")
                 .subscriptionType(SubscriptionType.Shared)
@@ -255,6 +259,8 @@ public class DeadLetterTopicTest extends ProducerConsumerBase {
             log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData()));
         }
         assertNull(checkMessage);
+        newPulsarClient.close();
+        newPulsarClient1.close();
         checkConsumer.close();
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
index 727ce62..208aa37 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
@@ -750,8 +750,9 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
             }
             // client must receive number of messages = maxUnAckPerbroker rather all produced messages
             assertNotEquals(messages1.size(), totalProducedMsgs);
+            PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
             // (1.b) consumer2 with same sub should not receive any more messages as subscription is blocked
-            ConsumerImpl<byte[]> consumer2Sub1 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
+            ConsumerImpl<byte[]> consumer2Sub1 = (ConsumerImpl<byte[]>) newPulsarClient.newConsumer().topic(topicName)
                     .subscriptionName(subscriberName1).receiverQueueSize(receiverQueueSize)
                     .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
             int consumer2Msgs = 0;
@@ -847,6 +848,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
             consumer1Sub1.close();
             consumerSub2.close();
             consumer1Sub3.close();
+            newPulsarClient.close();
 
             log.info("-- Exiting {} test --", methodName);
         } catch (Exception e) {
@@ -947,7 +949,8 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
             // client must receive number of messages = maxUnAckPerbroker rather all produced messages
             assertNotEquals(messages1.size(), totalProducedMsgs);
             // (1.b) consumer2 with same sub should not receive any more messages as subscription is blocked
-            ConsumerImpl<byte[]> consumer2Sub1 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
+            PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+            ConsumerImpl<byte[]> consumer2Sub1 = (ConsumerImpl<byte[]>) newPulsarClient.newConsumer().topic(topicName)
                     .subscriptionName(subscriberName1).receiverQueueSize(receiverQueueSize)
                     .subscriptionType(SubscriptionType.Shared).subscribe();
             int consumer2Msgs = 0;
@@ -1012,6 +1015,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
 
             consumer1Sub1.close();
             consumer1Sub2.close();
+            newPulsarClient.close();
 
             log.info("-- Exiting {} test --", methodName);
         } catch (Exception e) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
index e49a3d4..137c57d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
@@ -72,6 +72,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
     @Test(timeOut = 30000)
     public void testRoundRobinProducer() throws Exception {
         log.info("-- Starting {} test --", methodName);
+        PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
 
         int numPartitions = 4;
         TopicName topicName = TopicName.get("persistent://my-property/my-ns/my-partitionedtopic1-" + System.currentTimeMillis());
@@ -104,6 +105,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
         producer.close();
         consumer.unsubscribe();
         consumer.close();
+        pulsarClient.close();
         admin.topics().deletePartitionedTopic(topicName.toString());
 
         log.info("-- Exiting {} test --", methodName);
@@ -129,6 +131,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
 
     @Test(timeOut = 30000)
     public void testCustomPartitionProducer() throws Exception {
+        PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
         TopicName topicName = null;
         Producer<byte[]> producer = null;
         Consumer<byte[]> consumer = null;
@@ -170,6 +173,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
             producer.close();
             consumer.unsubscribe();
             consumer.close();
+            pulsarClient.close();
             admin.topics().deletePartitionedTopic(topicName.toString());
 
             log.info("-- Exiting {} test --", methodName);
@@ -179,6 +183,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
     @Test(timeOut = 30000)
     public void testSinglePartitionProducer() throws Exception {
         log.info("-- Starting {} test --", methodName);
+        PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
 
         int numPartitions = 4;
         TopicName topicName = TopicName
@@ -213,6 +218,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
         producer.close();
         consumer.unsubscribe();
         consumer.close();
+        pulsarClient.close();
         admin.topics().deletePartitionedTopic(topicName.toString());
 
         log.info("-- Exiting {} test --", methodName);
@@ -221,6 +227,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
     @Test(timeOut = 30000)
     public void testKeyBasedProducer() throws Exception {
         log.info("-- Starting {} test --", methodName);
+        PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
 
         int numPartitions = 4;
         TopicName topicName = TopicName
@@ -257,6 +264,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
         producer.close();
         consumer.unsubscribe();
         consumer.close();
+        pulsarClient.close();
         admin.topics().deletePartitionedTopic(topicName.toString());
 
         log.info("-- Exiting {} test --", methodName);
@@ -274,6 +282,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
     @Test(timeOut = 100000)
     public void testPauseAndResume() throws Exception {
         log.info("-- Starting {} test --", methodName);
+        PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
 
         int numPartitions = 2;
         String topicName = TopicName
@@ -320,6 +329,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
 
         consumer.close();
         producer.close();
+        pulsarClient.close();
         log.info("-- Exiting {} test --", methodName);
     }
 
@@ -496,6 +506,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
     @Test(timeOut = 30000)
     public void testAsyncPartitionedProducerConsumerQueueSizeOne() throws Exception {
         log.info("-- Starting {} test --", methodName);
+        PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
 
         final int totalMsg = 100;
         final Set<String> produceMsgs = Sets.newHashSet();
@@ -538,6 +549,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
         producer.close();
         consumer.unsubscribe();
         consumer.close();
+        pulsarClient.close();
         admin.topics().deletePartitionedTopic(topicName.toString());
 
         log.info("-- Exiting {} test --", methodName);
@@ -551,6 +563,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
     @Test(timeOut = 30000)
     public void testFairDistributionForPartitionConsumers() throws Exception {
         log.info("-- Starting {} test --", methodName);
+        PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
 
         final int numPartitions = 2;
         final String topicName = "persistent://my-property/my-ns/my-topic-" + System.currentTimeMillis();
@@ -600,6 +613,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
         producer2.close();
         consumer.unsubscribe();
         consumer.close();
+        pulsarClient.close();
         admin.topics().deletePartitionedTopic(topicName);
 
         log.info("-- Exiting {} test --", methodName);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 69aa1d3..963ea8d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -1278,11 +1278,15 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
             int totalReceiveMessages = 0;
 
             pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessages);
-            ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer()
+            Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
                     .topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
-                    .receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared);
-            Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
-            Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
+                    .receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
+
+            PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+            Consumer<byte[]> consumer2 = newPulsarClient.newConsumer()
+                    .topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
+                    .receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
+
             Producer<byte[]> producer = pulsarClient.newProducer()
                     .topic("persistent://my-property/my-ns/unacked-topic").create();
 
@@ -1350,6 +1354,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
             producer.close();
             consumer1.close();
             consumer2.close();
+            newPulsarClient.close();
             log.info("-- Exiting {} test --", methodName);
         } catch (Exception e) {
             fail();
@@ -1876,14 +1881,25 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
     @Test
     public void testPriorityConsumer() throws Exception {
         log.info("-- Starting {} test --", methodName);
-        ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer()
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
                 .topic("persistent://my-property/my-ns/my-topic2").subscriptionName("my-subscriber-name")
-                .subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).priorityLevel(1);
+                .subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).priorityLevel(1).subscribe();
+
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer2 = newPulsarClient.newConsumer()
+                .topic("persistent://my-property/my-ns/my-topic2").subscriptionName("my-subscriber-name")
+                .subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).priorityLevel(1).subscribe();
+
+        PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer3 = newPulsarClient1.newConsumer()
+                .topic("persistent://my-property/my-ns/my-topic2").subscriptionName("my-subscriber-name")
+                .subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).priorityLevel(1).subscribe();
+
+        PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer4 = newPulsarClient2.newConsumer()
+                .topic("persistent://my-property/my-ns/my-topic2").subscriptionName("my-subscriber-name")
+                .subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).priorityLevel(2).subscribe();
 
-        Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
-        Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
-        Consumer<byte[]> consumer3 = consumerBuilder.subscribe();
-        Consumer<byte[]> consumer4 = consumerBuilder.clone().priorityLevel(2).subscribe();
         Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic2")
                 .create();
         List<Future<MessageId>> futures = Lists.newArrayList();
@@ -1924,6 +1940,9 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         consumer2.close();
         consumer3.close();
         consumer4.close();
+        newPulsarClient.close();
+        newPulsarClient1.close();
+        newPulsarClient2.close();
         log.info("-- Exiting {} test --", methodName);
     }
 
@@ -1946,16 +1965,22 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         int maxUnAckMsgs = pulsar.getConfiguration().getMaxConcurrentLookupRequest();
         pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(queueSize);
 
-        ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer()
-                .topic("persistent://my-property/my-ns/my-topic2").subscriptionName("my-subscriber-name")
-                .subscriptionType(SubscriptionType.Shared).receiverQueueSize(queueSize)
-                .acknowledgmentGroupTime(0, TimeUnit.SECONDS);
-        Consumer<byte[]> c1 = consumerBuilder.subscribe();
-        Consumer<byte[]> c2 = consumerBuilder.subscribe();
         Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic2")
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
             .create();
+
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> c1 = newPulsarClient.newConsumer()
+                .topic("persistent://my-property/my-ns/my-topic2").subscriptionName("my-subscriber-name")
+                .subscriptionType(SubscriptionType.Shared).receiverQueueSize(queueSize)
+                .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> c2 = newPulsarClient1.newConsumer()
+                .topic("persistent://my-property/my-ns/my-topic2").subscriptionName("my-subscriber-name")
+                .subscriptionType(SubscriptionType.Shared).receiverQueueSize(queueSize)
+                .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
         List<Future<MessageId>> futures = Lists.newArrayList();
 
         // Asynchronously produce messages
@@ -1994,9 +2019,23 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         Assert.assertEquals(queueSize * 2, messages.size());
 
         // create new consumers with the same priority
-        Consumer<byte[]> c3 = consumerBuilder.subscribe();
-        Consumer<byte[]> c4 = consumerBuilder.subscribe();
-        Consumer<byte[]> c5 = consumerBuilder.subscribe();
+        PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> c3 = newPulsarClient2.newConsumer()
+                .topic("persistent://my-property/my-ns/my-topic2").subscriptionName("my-subscriber-name")
+                .subscriptionType(SubscriptionType.Shared).receiverQueueSize(queueSize)
+                .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient3 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> c4 = newPulsarClient3.newConsumer()
+                .topic("persistent://my-property/my-ns/my-topic2").subscriptionName("my-subscriber-name")
+                .subscriptionType(SubscriptionType.Shared).receiverQueueSize(queueSize)
+                .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient4 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> c5 = newPulsarClient4.newConsumer()
+                .topic("persistent://my-property/my-ns/my-topic2").subscriptionName("my-subscriber-name")
+                .subscriptionType(SubscriptionType.Shared).receiverQueueSize(queueSize)
+                .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
 
         // c1 and c2 are blocked: so, let c3, c4 and c5 consume rest of the messages
 
@@ -2038,6 +2077,11 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         c3.close();
         c4.close();
         c5.close();
+        newPulsarClient.close();
+        newPulsarClient1.close();
+        newPulsarClient2.close();
+        newPulsarClient3.close();
+        newPulsarClient4.close();
         pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnAckMsgs);
         log.info("-- Exiting {} test --", methodName);
     }
@@ -2389,15 +2433,18 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
                 .addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
                 .cryptoKeyReader(new EncKeyReader()).create();
 
-        Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topicsPattern(topicName)
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer1 = newPulsarClient.newConsumer().topicsPattern(topicName)
                 .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader())
                 .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
 
-        Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topicsPattern(topicName)
+        PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer2 = newPulsarClient1.newConsumer().topicsPattern(topicName)
                 .subscriptionName("my-subscriber-name").cryptoKeyReader(new InvalidKeyReader())
                 .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
 
-        Consumer<byte[]> consumer3 = pulsarClient.newConsumer().topicsPattern(topicName)
+        PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer3 = newPulsarClient2.newConsumer().topicsPattern(topicName)
                 .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
 
         int numberOfMessages = 100;
@@ -2436,6 +2483,9 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         consumer1.close();
         consumer2.close();
         consumer3.close();
+        newPulsarClient.close();
+        newPulsarClient1.close();
+        newPulsarClient2.close();
         log.info("-- Exiting {} test --", methodName);
     }
 
@@ -2842,4 +2892,42 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         assertEquals(latch.getCount(), 1);
         consumer.close();
     }
+
+    // Issue 3226: https://github.com/apache/pulsar/issues/3226
+    // Pull 3312: https://github.com/apache/pulsar/pull/3312
+    // Bugfix preventing duplicated consumers on same client cnx with shared subscription mode
+    @Test()
+    public void testPreventDupConsumersOnClientCnx() throws Exception {
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+        final String topic = "persistent://my-property/my-ns/my-topic";
+        final String subName = "my-subscription";
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+        Consumer<byte[]> consumerB = pulsarClient.newConsumer().topic(topic)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        consumer.unsubscribeAsync().whenComplete((aVoid1, t1) -> {
+            if (t1 != null) {
+                future.completeExceptionally(t1);
+                return;
+            }
+
+            consumer.closeAsync().whenComplete((aVoid2, t2) -> {
+                if (t2 != null) {
+                    future.completeExceptionally(t2);
+                    return;
+                }
+                future.complete(null);
+            });
+        });
+
+        future.get(5, TimeUnit.SECONDS);
+        Assert.assertTrue(future.isDone());
+        Assert.assertFalse(future.isCompletedExceptionally());
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index bbda912..ab7163a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -1292,7 +1292,9 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
                     .receiverQueueSize(receiverQueueSize)
                     .subscriptionType(SubscriptionType.Shared)
                     .subscribe();
-            Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+
+            PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+            Consumer<byte[]> consumer2 = newPulsarClient.newConsumer()
                     .topic("persistent://my-property/use/my-ns/unacked-topic")
                     .subscriptionName("subscriber-1")
                     .receiverQueueSize(receiverQueueSize)
@@ -1367,6 +1369,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
             producer.close();
             consumer1.close();
             consumer2.close();
+            newPulsarClient.close();
             log.info("-- Exiting {} test --", methodName);
         } catch (Exception e) {
             fail();
@@ -1914,19 +1917,25 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
     public void testPriorityConsumer() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        ConsumerBuilder<byte[]> cb1 = pulsarClient.newConsumer()
-                    .topic("persistent://my-property/use/my-ns/my-topic2")
-                    .subscriptionName("my-subscriber-name")
-                    .subscriptionType(SubscriptionType.Shared)
-                    .priorityLevel(1)
-                    .receiverQueueSize(5);
-        ConsumerBuilder<byte[]> cb4 = cb1.clone()
-                .priorityLevel(2)
-                .receiverQueueSize(5);
-        Consumer<byte[]> consumer1 = cb1.subscribe();
-        Consumer<byte[]> consumer2 = cb1.subscribe();
-        Consumer<byte[]> consumer3 = cb1.subscribe();
-        Consumer<byte[]> consumer4 = cb4.subscribe();
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer1 = newPulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
+                .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
+                .priorityLevel(1).receiverQueueSize(5).subscribe();
+
+        PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer2 = newPulsarClient1.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
+                .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
+                .priorityLevel(1).receiverQueueSize(5).subscribe();
+
+        PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer3 = newPulsarClient2.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
+                .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
+                .priorityLevel(1).receiverQueueSize(5).subscribe();
+
+        PulsarClient newPulsarClient3 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer4 = newPulsarClient3.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
+                .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
+                .priorityLevel(2).receiverQueueSize(5).subscribe();
 
         Producer<byte[]> producer = pulsarClient.newProducer()
                 .topic("persistent://my-property/use/my-ns/my-topic2")
@@ -1969,6 +1978,10 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
         consumer2.close();
         consumer3.close();
         consumer4.close();
+        newPulsarClient.close();
+        newPulsarClient1.close();
+        newPulsarClient2.close();
+        newPulsarClient3.close();
         log.info("-- Exiting {} test --", methodName);
     }
 
@@ -1992,13 +2005,14 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
         int maxUnAckMsgs = pulsar.getConfiguration().getMaxConcurrentLookupRequest();
         pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(queueSize);
 
-        ConsumerBuilder<byte[]> cb = pulsarClient.newConsumer()
-                .topic("persistent://my-property/use/my-ns/my-topic2")
-                .subscriptionName("my-subscriber-name")
-                .subscriptionType(SubscriptionType.Shared)
-                .receiverQueueSize(queueSize);
-        Consumer<byte[]> c1 = cb.subscribe();
-        Consumer<byte[]> c2 = cb.subscribe();
+        Consumer<byte[]> c1 = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
+                .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
+                .receiverQueueSize(queueSize).subscribe();
+
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> c2 = newPulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
+                .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
+                .receiverQueueSize(queueSize).subscribe();
 
         Producer<byte[]> producer = pulsarClient.newProducer()
                 .topic("persistent://my-property/use/my-ns/my-topic2")
@@ -2042,9 +2056,20 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
         Assert.assertEquals(queueSize * 2, messages.size());
 
         // create new consumers with the same priority
-        Consumer<byte[]> c3 = cb.subscribe();
-        Consumer<byte[]> c4 = cb.subscribe();
-        Consumer<byte[]> c5 = cb.subscribe();
+        PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> c3 = newPulsarClient1.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
+                .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
+                .receiverQueueSize(queueSize).subscribe();
+
+        PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> c4 = newPulsarClient2.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
+                .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
+                .receiverQueueSize(queueSize).subscribe();
+
+        PulsarClient newPulsarClient3 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> c5 = newPulsarClient3.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
+                .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
+                .receiverQueueSize(queueSize).subscribe();
 
         // c1 and c2 are blocked: so, let c3, c4 and c5 consume rest of the messages
 
@@ -2086,6 +2111,10 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
         c3.close();
         c4.close();
         c5.close();
+        newPulsarClient.close();
+        newPulsarClient1.close();
+        newPulsarClient2.close();
+        newPulsarClient3.close();
         pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnAckMsgs);
         log.info("-- Exiting {} test --", methodName);
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index feb5b9e..4447592 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -360,8 +360,9 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
         msg = consumer1.receive(2, TimeUnit.SECONDS);
         assertNull(msg);
 
-        // subscrie consumer2 with supporting batch version
-        Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+        // subscribe consumer2 with supporting batch version
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0); // Creates new client connection
+        Consumer<byte[]> consumer2 = newPulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
                 .subscribe();
 
         messageSet.clear();
@@ -377,6 +378,7 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
         consumer2.close();
         producer.close();
         batchProducer.close();
+        newPulsarClient.close();
         log.info("-- Exiting {} test --", methodName);
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 42189d8..26b7937 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -30,11 +30,7 @@ import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
 import io.netty.util.concurrent.DefaultThreadFactory;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.IdentityHashMap;
-import java.util.List;
-import java.util.Optional;
+import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -64,6 +60,7 @@ import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericSchema;
 import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
@@ -313,9 +310,12 @@ public class PulsarClientImpl implements PulsarClient {
         }
     }
 
-
-
     private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
+        Optional<ConsumerBase<T>> subscriber = subscriptionExist(conf);
+        if (subscriber.isPresent()) {
+            return CompletableFuture.completedFuture(subscriber.get());
+        }
+
         CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>();
 
         String topic = conf.getSingleTopic();
@@ -349,6 +349,11 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     private <T> CompletableFuture<Consumer<T>> multiTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
+        Optional<ConsumerBase<T>> subscriber = subscriptionExist(conf);
+        if (subscriber.isPresent()) {
+            return CompletableFuture.completedFuture(subscriber.get());
+        }
+
         CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>();
 
         ConsumerBase<T> consumer = new MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf,
@@ -371,6 +376,10 @@ public class PulsarClientImpl implements PulsarClient {
         Mode subscriptionMode = convertRegexSubscriptionMode(conf.getRegexSubscriptionMode());
         TopicName destination = TopicName.get(regex);
         NamespaceName namespaceName = destination.getNamespaceObject();
+        Optional<ConsumerBase<T>> subscriber = subscriptionExist(conf);
+        if (subscriber.isPresent()) {
+            return CompletableFuture.completedFuture(subscriber.get());
+        }
 
         CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>();
         lookup.getTopicsUnderNamespace(namespaceName, subscriptionMode)
@@ -674,6 +683,15 @@ public class PulsarClientImpl implements PulsarClient {
         });
     }
 
+    @SuppressWarnings("unchecked")
+    private <T> Optional<ConsumerBase<T>> subscriptionExist(ConsumerConfigurationData<?> conf) {
+        Optional<ConsumerBase<?>> subscriber = consumers.keySet().stream()
+                .filter(consumerBase -> consumerBase.getSubType().equals(PulsarApi.CommandSubscribe.SubType.Shared))
+                .filter(c -> c.getSubscription().equals(conf.getSubscriptionName()))
+                .findFirst();
+        return subscriber.map(ConsumerBase.class::cast);
+    }
+
     private static EventLoopGroup getEventLoopGroup(ClientConfigurationData conf) {
         ThreadFactory threadFactory = getThreadFactory("pulsar-client-io");
         return EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
index d316be3..a21e216 100644
--- a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
+++ b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
@@ -301,12 +301,12 @@ public class PulsarSpoutTest extends ProducerConsumerBase {
         otherSpout.open(Maps.newHashMap(), context, collector);
 
         topicStats = admin.topics().getStats(topic);
-        Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 2);
+        Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
 
         otherSpout.close();
 
         topicStats = admin.topics().getStats(topic);
-        Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
+        Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 0);
     }
 
     @Test