You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/19 00:49:34 UTC
[pulsar] 19/26: Revert dup consumer and related code (#4142)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 164d237cbe597908aa8e9cda5a7b920a3b342777
Author: Ezequiel Lovelle <ez...@gmail.com>
AuthorDate: Mon Apr 29 15:06:14 2019 -0300
Revert dup consumer and related code (#4142)
* Revert "[cpp client] implement reference count for close() (#3863)"
This reverts commit ee98e8b28ce1397c056757fe73115d21320ed883.
* Revert Prevent dup consumers and refCount for cpp client
Revert "[cpp client] Bugfix prevent dup consumer for same topic subscription"
Revert "[Issue #3226][cpp client] Prevent dup consumers on same client cnx"
This reverts commit fff02e2aa2064412dbae18b973eb2bb2abab25d8.
This reverts commit 762e0ab9a52e2665a106766c211d45474adc833b.
* Revert "Feature - implement reference count for ConsumerImpl (#3795)"
This reverts commit ff4db8db12be2eb79d910e2b286306298f71320e.
* Revert Prevent dup consumers and refCount for java client
Revert "Prevent dup consumers on same client cnx with shared subscription"
Revert "[java client] Bugfix prevent dup consumers for same topic subscribe"
This reverts commit 231db030b9529737237721059b2a5b3044d4cab1.
This reverts commit fb5dcd9a58524686f2f6208d41a1e82b5bbb8111.
---
.../client/api/SimpleProducerConsumerTest.java | 112 ---------------------
pulsar-client-cpp/lib/ClientImpl.cc | 13 ---
pulsar-client-cpp/lib/ConsumerImpl.cc | 12 ---
pulsar-client-cpp/lib/ConsumerImpl.h | 3 -
pulsar-client-cpp/lib/ConsumerImplBase.h | 1 -
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 82 ---------------
.../apache/pulsar/client/impl/ConsumerBase.java | 9 --
.../apache/pulsar/client/impl/ConsumerImpl.java | 4 -
.../pulsar/client/impl/PulsarClientImpl.java | 19 ----
.../org/apache/pulsar/storm/PulsarSpoutTest.java | 7 +-
10 files changed, 1 insertion(+), 261 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 2f5d764..4067bc3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -2907,116 +2907,4 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
assertEquals(latch.getCount(), 1);
consumer.close();
}
-
- // Issue 3226: https://github.com/apache/pulsar/issues/3226
- // Pull 3312: https://github.com/apache/pulsar/pull/3312
- // Bugfix preventing duplicated consumers on same client cnx with shared subscription mode
- @Test()
- public void testPreventDupConsumersOnClientCnxForSingleSub() throws Exception {
- final CompletableFuture<Void> future = new CompletableFuture<>();
- final String topic = "persistent://my-property/my-ns/my-topic";
- final String subName = "my-subscription";
-
- Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
- .subscriptionName(subName)
- .subscriptionType(SubscriptionType.Shared)
- .subscribe();
- Consumer<byte[]> consumerB = pulsarClient.newConsumer().topic(topic)
- .subscriptionName(subName)
- .subscriptionType(SubscriptionType.Shared)
- .subscribe();
-
- consumer.unsubscribeAsync().whenComplete((aVoid1, t1) -> {
- if (t1 != null) {
- future.completeExceptionally(t1);
- return;
- }
-
- consumer.closeAsync().whenComplete((aVoid2, t2) -> {
- if (t2 != null) {
- future.completeExceptionally(t2);
- return;
- }
- future.complete(null);
- });
- });
-
- future.get(5, TimeUnit.SECONDS);
- Assert.assertEquals(consumer, consumerB);
- Assert.assertTrue(future.isDone());
- Assert.assertFalse(future.isCompletedExceptionally());
- }
-
- @Test()
- public void testPreventDupConsumersOnClientCnxForSingleSub_AllowDifferentTopics() throws Exception {
- final CompletableFuture<Void> future = new CompletableFuture<>();
- final String topic = "persistent://my-property/my-ns/my-topic";
- final String subName = "my-subscription";
-
- Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
- .subscriptionName(subName)
- .subscriptionType(SubscriptionType.Shared)
- .subscribe();
- Consumer<byte[]> consumerB = pulsarClient.newConsumer().topic(topic)
- .subscriptionName(subName)
- .subscriptionType(SubscriptionType.Shared)
- .subscribe();
-
- // This consumer should be a newly subscription since is it from a different topic
- // even though has the same subscription name.
- Consumer<byte[]> consumerC = pulsarClient.newConsumer().topic(topic + "-different-topic")
- .subscriptionName(subName)
- .subscriptionType(SubscriptionType.Shared)
- .subscribe();
-
- consumer.unsubscribeAsync().whenComplete((aVoid1, t1) -> {
- if (t1 != null) {
- future.completeExceptionally(t1);
- return;
- }
-
- consumer.closeAsync().whenComplete((aVoid2, t2) -> {
- if (t2 != null) {
- future.completeExceptionally(t2);
- return;
- }
- future.complete(null);
- });
- });
-
- future.get(5, TimeUnit.SECONDS);
- Assert.assertEquals(consumer, consumerB);
- Assert.assertTrue(future.isDone());
- Assert.assertFalse(future.isCompletedExceptionally());
-
- // consumerC is a newly created subscription.
- Assert.assertNotEquals(consumer, consumerC);
- Assert.assertTrue(consumerC.isConnected());
- consumerC.close();
- }
-
- @Test
- public void testRefCount_OnCloseConsumer() throws Exception {
- final String topic = "persistent://my-property/my-ns/my-topic";
- final String subName = "my-subscription";
-
- Consumer<byte[]> consumerA = pulsarClient.newConsumer().topic(topic)
- .subscriptionName(subName)
- .subscriptionType(SubscriptionType.Shared)
- .subscribe();
- Consumer<byte[]> consumerB = pulsarClient.newConsumer().topic(topic)
- .subscriptionName(subName)
- .subscriptionType(SubscriptionType.Shared)
- .subscribe();
-
- Assert.assertEquals(consumerA, consumerB);
-
- consumerA.close();
- Assert.assertTrue(consumerA.isConnected());
- Assert.assertTrue(consumerB.isConnected());
-
- consumerB.close();
- Assert.assertFalse(consumerA.isConnected());
- Assert.assertFalse(consumerB.isConnected());
- }
}
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index e2c15e0..0e996cc 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -340,19 +340,6 @@ void ClientImpl::subscribeAsync(const std::string& topic, const std::string& con
lock.unlock();
callback(ResultInvalidConfiguration, Consumer());
return;
- } else if (conf.getConsumerType() == ConsumerShared) {
- ConsumersList consumers(consumers_);
- for (auto& weakPtr : consumers) {
- ConsumerImplBasePtr consumer = weakPtr.lock();
- if (consumer && consumer->getSubscriptionName() == consumerName &&
- consumer->getTopic() == topic && !consumer->isClosed()) {
- consumer->incrRefCount();
- lock.unlock();
- LOG_INFO("Reusing existing consumer instance for " << topic << " -- " << consumerName);
- callback(ResultOk, Consumer(consumer));
- return;
- }
- }
}
}
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 3fce8d2..7f36e65 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -109,10 +109,6 @@ Future<Result, ConsumerImplBaseWeakPtr> ConsumerImpl::getConsumerCreatedFuture()
return consumerCreatedPromise_.getFuture();
}
-void ConsumerImpl::incrRefCount() { ++refCount_; }
-
-unsigned int ConsumerImpl::safeDecrRefCount() { return refCount_ > 0 ? refCount_-- : refCount_; }
-
const std::string& ConsumerImpl::getSubscriptionName() const { return originalSubscriptionName_; }
const std::string& ConsumerImpl::getTopic() const { return topic_; }
@@ -815,14 +811,6 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
return;
}
- if (safeDecrRefCount() != 0) {
- lock.unlock();
- if (callback) {
- callback(ResultOk);
- }
- return;
- }
-
ClientConnectionPtr cnx = getCnx().lock();
if (!cnx) {
lock.unlock();
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index 7e1066c..b5fe761 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -107,7 +107,6 @@ class ConsumerImpl : public ConsumerImplBase,
virtual bool isReadCompacted();
virtual void hasMessageAvailableAsync(HasMessageAvailableCallback callback);
virtual void getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback);
- virtual void incrRefCount();
protected:
void connectionOpened(const ClientConnectionPtr& cnx);
@@ -141,7 +140,6 @@ class ConsumerImpl : public ConsumerImplBase,
void statsCallback(Result, ResultCallback, proto::CommandAck_AckType);
void notifyPendingReceivedCallback(Result result, Message& message, const ReceiveCallback& callback);
void failPendingReceiveCallback();
- unsigned int safeDecrRefCount();
Optional<MessageId> clearReceiveQueue();
@@ -171,7 +169,6 @@ class ConsumerImpl : public ConsumerImplBase,
UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
BatchAcknowledgementTracker batchAcknowledgementTracker_;
BrokerConsumerStatsImpl brokerConsumerStats_;
- unsigned int refCount_ = 0;
MessageCryptoPtr msgCrypto_;
const bool readCompacted_;
diff --git a/pulsar-client-cpp/lib/ConsumerImplBase.h b/pulsar-client-cpp/lib/ConsumerImplBase.h
index e6be186..766b0a9 100644
--- a/pulsar-client-cpp/lib/ConsumerImplBase.h
+++ b/pulsar-client-cpp/lib/ConsumerImplBase.h
@@ -50,7 +50,6 @@ class ConsumerImplBase {
virtual int getNumOfPrefetchedMessages() const = 0;
virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) = 0;
virtual void seekAsync(const MessageId& msgId, ResultCallback callback) = 0;
- virtual void incrRefCount(){};
};
} // namespace pulsar
#endif // PULSAR_CONSUMER_IMPL_BASE_HEADER
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 3ee85db..8e4de24 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -2794,85 +2794,3 @@ TEST(BasicEndToEndTest, testPartitionedReceiveAsyncFailedConsumer) {
ASSERT_EQ(count, 0);
client.shutdown();
}
-
-TEST(BasicEndToEndTest, testPreventDupConsumersOnSharedMode) {
- ClientConfiguration config;
- Client client(lookupUrl);
- std::string subsName = "my-only-sub";
- std::string topicName = "persistent://public/default/test-prevent-dup-consumers";
- ConsumerConfiguration consumerConf;
- consumerConf.setConsumerType(ConsumerShared);
-
- Consumer consumerA;
- Result resultA = client.subscribe(topicName, subsName, consumerConf, consumerA);
- ASSERT_EQ(ResultOk, resultA);
- ASSERT_EQ(consumerA.getSubscriptionName(), subsName);
-
- Consumer consumerB;
- Result resultB = client.subscribe(topicName, subsName, consumerConf, consumerB);
- ASSERT_EQ(ResultOk, resultB);
- ASSERT_EQ(consumerB.getSubscriptionName(), subsName);
-
- // Since this is a shared consumer over same client cnx
- // closing consumerA should result in consumerB also being closed.
- ASSERT_EQ(ResultOk, consumerA.close());
- ASSERT_EQ(ResultOk, consumerB.close());
- ASSERT_EQ(ResultAlreadyClosed, consumerA.close());
- ASSERT_EQ(ResultAlreadyClosed, consumerB.close());
-}
-
-TEST(BasicEndToEndTest, testDupConsumersOnSharedModeNotThrowsExcOnUnsubscribe) {
- ClientConfiguration config;
- Client client(lookupUrl);
- std::string subsName = "my-only-sub";
- std::string topicName =
- "persistent://public/default/testDupConsumersOnSharedModeNotThrowsExcOnUnsubscribe";
- ConsumerConfiguration consumerConf;
- consumerConf.setConsumerType(ConsumerShared);
-
- Consumer consumerA;
- Result resultA = client.subscribe(topicName, subsName, consumerConf, consumerA);
- ASSERT_EQ(ResultOk, resultA);
- ASSERT_EQ(consumerA.getSubscriptionName(), subsName);
-
- Consumer consumerB;
- Result resultB = client.subscribe(topicName, subsName, consumerConf, consumerB);
- ASSERT_EQ(ResultOk, resultB);
- ASSERT_EQ(consumerB.getSubscriptionName(), subsName);
-
- ASSERT_EQ(ResultOk, consumerA.unsubscribe());
- // If dup consumers are allowed BrokerMetadataError will be the result of close()
- ASSERT_EQ(ResultAlreadyClosed, consumerA.close());
-}
-
-TEST(BasicEndToEndTest, testPreventDupConsumersAllowSameSubForDifferentTopics) {
- ClientConfiguration config;
- Client client(lookupUrl);
- std::string subsName = "my-only-sub";
- std::string topicName =
- "persistent://public/default/testPreventDupConsumersAllowSameSubForDifferentTopics";
- ConsumerConfiguration consumerConf;
- consumerConf.setConsumerType(ConsumerShared);
-
- Consumer consumerA;
- Result resultA = client.subscribe(topicName, subsName, consumerConf, consumerA);
- ASSERT_EQ(ResultOk, resultA);
- ASSERT_EQ(consumerA.getSubscriptionName(), subsName);
-
- Consumer consumerB;
- Result resultB = client.subscribe(topicName, subsName, consumerConf, consumerB);
- ASSERT_EQ(ResultOk, resultB);
- ASSERT_EQ(consumerB.getSubscriptionName(), subsName);
-
- Consumer consumerC;
- Result resultC = client.subscribe(topicName + "-different-topic", subsName, consumerConf, consumerC);
- ASSERT_EQ(ResultOk, resultB);
- ASSERT_EQ(consumerB.getSubscriptionName(), subsName);
- ASSERT_EQ(ResultOk, consumerA.close());
- ASSERT_EQ(ResultOk, consumerB.close());
- ASSERT_EQ(ResultAlreadyClosed, consumerA.close());
- ASSERT_EQ(ResultAlreadyClosed, consumerB.close());
-
- // consumer C should be a different instance from A and B and should be with open state.
- ASSERT_EQ(ResultOk, consumerC.close());
-}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 7132ed0..71fc310 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -61,7 +61,6 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
protected int maxReceiverQueueSize;
protected final Schema<T> schema;
protected final ConsumerInterceptors<T> interceptors;
- private int refCount = 0;
protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
int receiverQueueSize, ExecutorService listenerExecutor,
@@ -380,12 +379,4 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
interceptors.onAcknowledgeCumulative(this, messageId, exception);
}
}
-
- protected synchronized void incrRefCount() {
- ++refCount;
- }
-
- protected synchronized boolean shouldTearDown() {
- return refCount > 0 ? refCount-- == 0 : refCount == 0;
- }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 266f483..4ecadfe 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -681,10 +681,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
@Override
public CompletableFuture<Void> closeAsync() {
- if (!shouldTearDown()) {
- return CompletableFuture.completedFuture(null);
- }
-
if (getState() == State.Closing || getState() == State.Closed) {
unAckedMessageTracker.close();
if (possibleSendToDeadLetterTopicMessages != null) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index aa6c671..a2027ad 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -311,11 +311,6 @@ public class PulsarClientImpl implements PulsarClient {
}
private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
- Optional<ConsumerBase<T>> subscriber = subscriptionExist(conf);
- if (subscriber.isPresent()) {
- return CompletableFuture.completedFuture(subscriber.get());
- }
-
CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>();
String topic = conf.getSingleTopic();
@@ -674,20 +669,6 @@ public class PulsarClientImpl implements PulsarClient {
});
}
- @SuppressWarnings("unchecked")
- private <T> Optional<ConsumerBase<T>> subscriptionExist(ConsumerConfigurationData<?> conf) {
- synchronized (consumers) {
- Optional<ConsumerBase<?>> subscriber = consumers.keySet().stream()
- .filter(c -> c.getSubType().equals(PulsarApi.CommandSubscribe.SubType.Shared))
- .filter(c -> conf.getTopicNames().contains(c.getTopic()))
- .filter(c -> c.getSubscription().equals(conf.getSubscriptionName()))
- .filter(Consumer::isConnected)
- .findFirst();
- subscriber.ifPresent(ConsumerBase::incrRefCount);
- return subscriber.map(ConsumerBase.class::cast);
- }
- }
-
private static EventLoopGroup getEventLoopGroup(ClientConfigurationData conf) {
ThreadFactory threadFactory = getThreadFactory("pulsar-client-io");
return EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
index 877c927..d316be3 100644
--- a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
+++ b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
@@ -301,17 +301,12 @@ public class PulsarSpoutTest extends ProducerConsumerBase {
otherSpout.open(Maps.newHashMap(), context, collector);
topicStats = admin.topics().getStats(topic);
- Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
+ Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 2);
otherSpout.close();
topicStats = admin.topics().getStats(topic);
Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
-
- otherSpout.close();
-
- topicStats = admin.topics().getStats(topic);
- Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 0);
}
@Test