You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/19 00:49:34 UTC

[pulsar] 19/26: Revert dup consumer and related code (#4142)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 164d237cbe597908aa8e9cda5a7b920a3b342777
Author: Ezequiel Lovelle <ez...@gmail.com>
AuthorDate: Mon Apr 29 15:06:14 2019 -0300

    Revert dup consumer and related code (#4142)
    
    * Revert "[cpp client] implement reference count for close() (#3863)"
    
    This reverts commit ee98e8b28ce1397c056757fe73115d21320ed883.
    
    * Revert Prevent dup consumers and refCount for cpp client
    
    Revert "[cpp client] Bugfix prevent dup consumer for same topic subscription"
    Revert "[Issue #3226][cpp client] Prevent dup consumers on same client cnx"
    
    This reverts commit fff02e2aa2064412dbae18b973eb2bb2abab25d8.
    This reverts commit 762e0ab9a52e2665a106766c211d45474adc833b.
    
    * Revert "Feature - implement reference count for ConsumerImpl (#3795)"
    
    This reverts commit ff4db8db12be2eb79d910e2b286306298f71320e.
    
    * Revert Prevent dup consumers and refCount for java client
    
    Revert "Prevent dup consumers on same client cnx with shared subscription"
    Revert "[java client] Bugfix prevent dup consumers for same topic subscribe"
    
    This reverts commit 231db030b9529737237721059b2a5b3044d4cab1.
    This reverts commit fb5dcd9a58524686f2f6208d41a1e82b5bbb8111.
---
 .../client/api/SimpleProducerConsumerTest.java     | 112 ---------------------
 pulsar-client-cpp/lib/ClientImpl.cc                |  13 ---
 pulsar-client-cpp/lib/ConsumerImpl.cc              |  12 ---
 pulsar-client-cpp/lib/ConsumerImpl.h               |   3 -
 pulsar-client-cpp/lib/ConsumerImplBase.h           |   1 -
 pulsar-client-cpp/tests/BasicEndToEndTest.cc       |  82 ---------------
 .../apache/pulsar/client/impl/ConsumerBase.java    |   9 --
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   4 -
 .../pulsar/client/impl/PulsarClientImpl.java       |  19 ----
 .../org/apache/pulsar/storm/PulsarSpoutTest.java   |   7 +-
 10 files changed, 1 insertion(+), 261 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 2f5d764..4067bc3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -2907,116 +2907,4 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         assertEquals(latch.getCount(), 1);
         consumer.close();
     }
-
-    // Issue 3226: https://github.com/apache/pulsar/issues/3226
-    // Pull 3312: https://github.com/apache/pulsar/pull/3312
-    // Bugfix preventing duplicated consumers on same client cnx with shared subscription mode
-    @Test()
-    public void testPreventDupConsumersOnClientCnxForSingleSub() throws Exception {
-        final CompletableFuture<Void> future = new CompletableFuture<>();
-        final String topic = "persistent://my-property/my-ns/my-topic";
-        final String subName = "my-subscription";
-
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
-                .subscriptionName(subName)
-                .subscriptionType(SubscriptionType.Shared)
-                .subscribe();
-        Consumer<byte[]> consumerB = pulsarClient.newConsumer().topic(topic)
-                .subscriptionName(subName)
-                .subscriptionType(SubscriptionType.Shared)
-                .subscribe();
-
-        consumer.unsubscribeAsync().whenComplete((aVoid1, t1) -> {
-            if (t1 != null) {
-                future.completeExceptionally(t1);
-                return;
-            }
-
-            consumer.closeAsync().whenComplete((aVoid2, t2) -> {
-                if (t2 != null) {
-                    future.completeExceptionally(t2);
-                    return;
-                }
-                future.complete(null);
-            });
-        });
-
-        future.get(5, TimeUnit.SECONDS);
-        Assert.assertEquals(consumer, consumerB);
-        Assert.assertTrue(future.isDone());
-        Assert.assertFalse(future.isCompletedExceptionally());
-    }
-
-    @Test()
-    public void testPreventDupConsumersOnClientCnxForSingleSub_AllowDifferentTopics() throws Exception {
-        final CompletableFuture<Void> future = new CompletableFuture<>();
-        final String topic = "persistent://my-property/my-ns/my-topic";
-        final String subName = "my-subscription";
-
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
-                .subscriptionName(subName)
-                .subscriptionType(SubscriptionType.Shared)
-                .subscribe();
-        Consumer<byte[]> consumerB = pulsarClient.newConsumer().topic(topic)
-                .subscriptionName(subName)
-                .subscriptionType(SubscriptionType.Shared)
-                .subscribe();
-
-        // This consumer should be a newly subscription since is it from a different topic
-        // even though has the same subscription name.
-        Consumer<byte[]> consumerC = pulsarClient.newConsumer().topic(topic + "-different-topic")
-                .subscriptionName(subName)
-                .subscriptionType(SubscriptionType.Shared)
-                .subscribe();
-
-        consumer.unsubscribeAsync().whenComplete((aVoid1, t1) -> {
-            if (t1 != null) {
-                future.completeExceptionally(t1);
-                return;
-            }
-
-            consumer.closeAsync().whenComplete((aVoid2, t2) -> {
-                if (t2 != null) {
-                    future.completeExceptionally(t2);
-                    return;
-                }
-                future.complete(null);
-            });
-        });
-
-        future.get(5, TimeUnit.SECONDS);
-        Assert.assertEquals(consumer, consumerB);
-        Assert.assertTrue(future.isDone());
-        Assert.assertFalse(future.isCompletedExceptionally());
-
-        // consumerC is a newly created subscription.
-        Assert.assertNotEquals(consumer, consumerC);
-        Assert.assertTrue(consumerC.isConnected());
-        consumerC.close();
-    }
-
-    @Test
-    public void testRefCount_OnCloseConsumer() throws Exception {
-        final String topic = "persistent://my-property/my-ns/my-topic";
-        final String subName = "my-subscription";
-
-        Consumer<byte[]> consumerA = pulsarClient.newConsumer().topic(topic)
-                .subscriptionName(subName)
-                .subscriptionType(SubscriptionType.Shared)
-                .subscribe();
-        Consumer<byte[]> consumerB = pulsarClient.newConsumer().topic(topic)
-                .subscriptionName(subName)
-                .subscriptionType(SubscriptionType.Shared)
-                .subscribe();
-
-        Assert.assertEquals(consumerA, consumerB);
-
-        consumerA.close();
-        Assert.assertTrue(consumerA.isConnected());
-        Assert.assertTrue(consumerB.isConnected());
-
-        consumerB.close();
-        Assert.assertFalse(consumerA.isConnected());
-        Assert.assertFalse(consumerB.isConnected());
-    }
 }
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index e2c15e0..0e996cc 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -340,19 +340,6 @@ void ClientImpl::subscribeAsync(const std::string& topic, const std::string& con
             lock.unlock();
             callback(ResultInvalidConfiguration, Consumer());
             return;
-        } else if (conf.getConsumerType() == ConsumerShared) {
-            ConsumersList consumers(consumers_);
-            for (auto& weakPtr : consumers) {
-                ConsumerImplBasePtr consumer = weakPtr.lock();
-                if (consumer && consumer->getSubscriptionName() == consumerName &&
-                    consumer->getTopic() == topic && !consumer->isClosed()) {
-                    consumer->incrRefCount();
-                    lock.unlock();
-                    LOG_INFO("Reusing existing consumer instance for " << topic << " -- " << consumerName);
-                    callback(ResultOk, Consumer(consumer));
-                    return;
-                }
-            }
         }
     }
 
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 3fce8d2..7f36e65 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -109,10 +109,6 @@ Future<Result, ConsumerImplBaseWeakPtr> ConsumerImpl::getConsumerCreatedFuture()
     return consumerCreatedPromise_.getFuture();
 }
 
-void ConsumerImpl::incrRefCount() { ++refCount_; }
-
-unsigned int ConsumerImpl::safeDecrRefCount() { return refCount_ > 0 ? refCount_-- : refCount_; }
-
 const std::string& ConsumerImpl::getSubscriptionName() const { return originalSubscriptionName_; }
 
 const std::string& ConsumerImpl::getTopic() const { return topic_; }
@@ -815,14 +811,6 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
         return;
     }
 
-    if (safeDecrRefCount() != 0) {
-        lock.unlock();
-        if (callback) {
-            callback(ResultOk);
-        }
-        return;
-    }
-
     ClientConnectionPtr cnx = getCnx().lock();
     if (!cnx) {
         lock.unlock();
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index 7e1066c..b5fe761 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -107,7 +107,6 @@ class ConsumerImpl : public ConsumerImplBase,
     virtual bool isReadCompacted();
     virtual void hasMessageAvailableAsync(HasMessageAvailableCallback callback);
     virtual void getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback);
-    virtual void incrRefCount();
 
    protected:
     void connectionOpened(const ClientConnectionPtr& cnx);
@@ -141,7 +140,6 @@ class ConsumerImpl : public ConsumerImplBase,
     void statsCallback(Result, ResultCallback, proto::CommandAck_AckType);
     void notifyPendingReceivedCallback(Result result, Message& message, const ReceiveCallback& callback);
     void failPendingReceiveCallback();
-    unsigned int safeDecrRefCount();
 
     Optional<MessageId> clearReceiveQueue();
 
@@ -171,7 +169,6 @@ class ConsumerImpl : public ConsumerImplBase,
     UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
     BatchAcknowledgementTracker batchAcknowledgementTracker_;
     BrokerConsumerStatsImpl brokerConsumerStats_;
-    unsigned int refCount_ = 0;
 
     MessageCryptoPtr msgCrypto_;
     const bool readCompacted_;
diff --git a/pulsar-client-cpp/lib/ConsumerImplBase.h b/pulsar-client-cpp/lib/ConsumerImplBase.h
index e6be186..766b0a9 100644
--- a/pulsar-client-cpp/lib/ConsumerImplBase.h
+++ b/pulsar-client-cpp/lib/ConsumerImplBase.h
@@ -50,7 +50,6 @@ class ConsumerImplBase {
     virtual int getNumOfPrefetchedMessages() const = 0;
     virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) = 0;
     virtual void seekAsync(const MessageId& msgId, ResultCallback callback) = 0;
-    virtual void incrRefCount(){};
 };
 }  // namespace pulsar
 #endif  // PULSAR_CONSUMER_IMPL_BASE_HEADER
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 3ee85db..8e4de24 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -2794,85 +2794,3 @@ TEST(BasicEndToEndTest, testPartitionedReceiveAsyncFailedConsumer) {
     ASSERT_EQ(count, 0);
     client.shutdown();
 }
-
-TEST(BasicEndToEndTest, testPreventDupConsumersOnSharedMode) {
-    ClientConfiguration config;
-    Client client(lookupUrl);
-    std::string subsName = "my-only-sub";
-    std::string topicName = "persistent://public/default/test-prevent-dup-consumers";
-    ConsumerConfiguration consumerConf;
-    consumerConf.setConsumerType(ConsumerShared);
-
-    Consumer consumerA;
-    Result resultA = client.subscribe(topicName, subsName, consumerConf, consumerA);
-    ASSERT_EQ(ResultOk, resultA);
-    ASSERT_EQ(consumerA.getSubscriptionName(), subsName);
-
-    Consumer consumerB;
-    Result resultB = client.subscribe(topicName, subsName, consumerConf, consumerB);
-    ASSERT_EQ(ResultOk, resultB);
-    ASSERT_EQ(consumerB.getSubscriptionName(), subsName);
-
-    // Since this is a shared consumer over same client cnx
-    // closing consumerA should result in consumerB also being closed.
-    ASSERT_EQ(ResultOk, consumerA.close());
-    ASSERT_EQ(ResultOk, consumerB.close());
-    ASSERT_EQ(ResultAlreadyClosed, consumerA.close());
-    ASSERT_EQ(ResultAlreadyClosed, consumerB.close());
-}
-
-TEST(BasicEndToEndTest, testDupConsumersOnSharedModeNotThrowsExcOnUnsubscribe) {
-    ClientConfiguration config;
-    Client client(lookupUrl);
-    std::string subsName = "my-only-sub";
-    std::string topicName =
-        "persistent://public/default/testDupConsumersOnSharedModeNotThrowsExcOnUnsubscribe";
-    ConsumerConfiguration consumerConf;
-    consumerConf.setConsumerType(ConsumerShared);
-
-    Consumer consumerA;
-    Result resultA = client.subscribe(topicName, subsName, consumerConf, consumerA);
-    ASSERT_EQ(ResultOk, resultA);
-    ASSERT_EQ(consumerA.getSubscriptionName(), subsName);
-
-    Consumer consumerB;
-    Result resultB = client.subscribe(topicName, subsName, consumerConf, consumerB);
-    ASSERT_EQ(ResultOk, resultB);
-    ASSERT_EQ(consumerB.getSubscriptionName(), subsName);
-
-    ASSERT_EQ(ResultOk, consumerA.unsubscribe());
-    // If dup consumers are allowed BrokerMetadataError will be the result of close()
-    ASSERT_EQ(ResultAlreadyClosed, consumerA.close());
-}
-
-TEST(BasicEndToEndTest, testPreventDupConsumersAllowSameSubForDifferentTopics) {
-    ClientConfiguration config;
-    Client client(lookupUrl);
-    std::string subsName = "my-only-sub";
-    std::string topicName =
-        "persistent://public/default/testPreventDupConsumersAllowSameSubForDifferentTopics";
-    ConsumerConfiguration consumerConf;
-    consumerConf.setConsumerType(ConsumerShared);
-
-    Consumer consumerA;
-    Result resultA = client.subscribe(topicName, subsName, consumerConf, consumerA);
-    ASSERT_EQ(ResultOk, resultA);
-    ASSERT_EQ(consumerA.getSubscriptionName(), subsName);
-
-    Consumer consumerB;
-    Result resultB = client.subscribe(topicName, subsName, consumerConf, consumerB);
-    ASSERT_EQ(ResultOk, resultB);
-    ASSERT_EQ(consumerB.getSubscriptionName(), subsName);
-
-    Consumer consumerC;
-    Result resultC = client.subscribe(topicName + "-different-topic", subsName, consumerConf, consumerC);
-    ASSERT_EQ(ResultOk, resultB);
-    ASSERT_EQ(consumerB.getSubscriptionName(), subsName);
-    ASSERT_EQ(ResultOk, consumerA.close());
-    ASSERT_EQ(ResultOk, consumerB.close());
-    ASSERT_EQ(ResultAlreadyClosed, consumerA.close());
-    ASSERT_EQ(ResultAlreadyClosed, consumerB.close());
-
-    // consumer C should be a different instance from A and B and should be with open state.
-    ASSERT_EQ(ResultOk, consumerC.close());
-}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 7132ed0..71fc310 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -61,7 +61,6 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     protected int maxReceiverQueueSize;
     protected final Schema<T> schema;
     protected final ConsumerInterceptors<T> interceptors;
-    private int refCount = 0;
 
     protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
                            int receiverQueueSize, ExecutorService listenerExecutor,
@@ -380,12 +379,4 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
             interceptors.onAcknowledgeCumulative(this, messageId, exception);
         }
     }
-
-    protected synchronized void incrRefCount() {
-        ++refCount;
-    }
-
-    protected synchronized boolean shouldTearDown() {
-        return refCount > 0 ? refCount-- == 0 : refCount == 0;
-    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 266f483..4ecadfe 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -681,10 +681,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
     @Override
     public CompletableFuture<Void> closeAsync() {
-        if (!shouldTearDown()) {
-            return CompletableFuture.completedFuture(null);
-        }
-
         if (getState() == State.Closing || getState() == State.Closed) {
             unAckedMessageTracker.close();
             if (possibleSendToDeadLetterTopicMessages != null) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index aa6c671..a2027ad 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -311,11 +311,6 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
-        Optional<ConsumerBase<T>> subscriber = subscriptionExist(conf);
-        if (subscriber.isPresent()) {
-            return CompletableFuture.completedFuture(subscriber.get());
-        }
-
         CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>();
 
         String topic = conf.getSingleTopic();
@@ -674,20 +669,6 @@ public class PulsarClientImpl implements PulsarClient {
         });
     }
 
-    @SuppressWarnings("unchecked")
-    private <T> Optional<ConsumerBase<T>> subscriptionExist(ConsumerConfigurationData<?> conf) {
-        synchronized (consumers) {
-            Optional<ConsumerBase<?>> subscriber = consumers.keySet().stream()
-                    .filter(c -> c.getSubType().equals(PulsarApi.CommandSubscribe.SubType.Shared))
-                    .filter(c -> conf.getTopicNames().contains(c.getTopic()))
-                    .filter(c -> c.getSubscription().equals(conf.getSubscriptionName()))
-                    .filter(Consumer::isConnected)
-                    .findFirst();
-            subscriber.ifPresent(ConsumerBase::incrRefCount);
-            return subscriber.map(ConsumerBase.class::cast);
-        }
-    }
-
     private static EventLoopGroup getEventLoopGroup(ClientConfigurationData conf) {
         ThreadFactory threadFactory = getThreadFactory("pulsar-client-io");
         return EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
index 877c927..d316be3 100644
--- a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
+++ b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
@@ -301,17 +301,12 @@ public class PulsarSpoutTest extends ProducerConsumerBase {
         otherSpout.open(Maps.newHashMap(), context, collector);
 
         topicStats = admin.topics().getStats(topic);
-        Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
+        Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 2);
 
         otherSpout.close();
 
         topicStats = admin.topics().getStats(topic);
         Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
-
-        otherSpout.close();
-
-        topicStats = admin.topics().getStats(topic);
-        Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 0);
     }
 
     @Test