You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/12 04:43:41 UTC
[pulsar] branch master updated: Feature - implement reference count
for ConsumerImpl (#3795)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ff4db8d Feature - implement reference count for ConsumerImpl (#3795)
ff4db8d is described below
commit ff4db8db12be2eb79d910e2b286306298f71320e
Author: Ezequiel Lovelle <ez...@gmail.com>
AuthorDate: Tue Mar 12 01:43:36 2019 -0300
Feature - implement reference count for ConsumerImpl (#3795)
* Feature - implement reference count for ConsumerImpl
Add reference count for ConsumerImpl in order to track reused instances of a
consumer instance returned by `subscribe()` method call.
Having the reference of subscribed consumer instances offers the ability to not
close a consumer until the last corresponding `close()` is being called.
Modifications:
- Add field on ConsumerBase to track references of consumer instances
subscribed by the user.
- Add checks on ConsumerImpl to know whether close() action should be
performed regarding of reference count being zero value.
- Increment reference count when a previous built consumer instance is being
used by caller.
Future considerations:
When optimization #3312 is going to be made for other consumers implementation
aside from ConsumerImpl it should add refCount checks on close() method call.
* Add tests for reference count on ConsumerImpl
- Add test to verify ConsumerImpl reference count on close() method.
- Fix test from dup consumers feature with refcount.
---
.../client/api/SimpleProducerConsumerTest.java | 27 ++++++++++++++++++++--
.../apache/pulsar/client/impl/ConsumerBase.java | 10 +++++++-
.../apache/pulsar/client/impl/ConsumerImpl.java | 4 ++++
.../pulsar/client/impl/PulsarClientImpl.java | 1 +
.../org/apache/pulsar/storm/PulsarSpoutTest.java | 5 ++++
5 files changed, 44 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 0f3a5b6..06dacd8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.client.api;
-import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.spy;
@@ -65,7 +64,6 @@ import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageCrypto;
import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -3076,4 +3074,29 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
Assert.assertTrue(consumerC.isConnected());
consumerC.close();
}
+
+ @Test
+ public void testRefCount_OnCloseConsumer() throws Exception {
+ final String topic = "persistent://my-property/my-ns/my-topic";
+ final String subName = "my-subscription";
+
+ Consumer<byte[]> consumerA = pulsarClient.newConsumer().topic(topic)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+ Consumer<byte[]> consumerB = pulsarClient.newConsumer().topic(topic)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+
+ Assert.assertEquals(consumerA, consumerB);
+
+ consumerA.close();
+ Assert.assertTrue(consumerA.isConnected());
+ Assert.assertTrue(consumerB.isConnected());
+
+ consumerB.close();
+ Assert.assertFalse(consumerA.isConnected());
+ Assert.assertFalse(consumerB.isConnected());
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 770ff2b..11d7543 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -59,8 +59,9 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
final BlockingQueue<Message<T>> incomingMessages;
protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives;
protected int maxReceiverQueueSize;
- protected Schema<T> schema;
+ protected final Schema<T> schema;
protected final ConsumerInterceptors<T> interceptors;
+ private int refCount = 0;
protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
int receiverQueueSize, ExecutorService listenerExecutor,
@@ -385,4 +386,11 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
}
}
+ protected synchronized void incrRefCount() {
+ ++refCount;
+ }
+
+ protected synchronized boolean shouldTearDown() {
+ return refCount > 0 ? refCount-- == 0 : refCount == 0;
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b75388a..499562f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -624,6 +624,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
@Override
public CompletableFuture<Void> closeAsync() {
+ if (!shouldTearDown()) {
+ return CompletableFuture.completedFuture(null);
+ }
+
if (getState() == State.Closing || getState() == State.Closed) {
unAckedMessageTracker.close();
if (possibleSendToDeadLetterTopicMessages != null) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 257e627..2e92c50 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -684,6 +684,7 @@ public class PulsarClientImpl implements PulsarClient {
.filter(c -> c.getSubscription().equals(conf.getSubscriptionName()))
.filter(Consumer::isConnected)
.findFirst();
+ subscriber.ifPresent(ConsumerBase::incrRefCount);
return subscriber.map(ConsumerBase.class::cast);
}
}
diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
index 461a6d7..d8aa539 100644
--- a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
+++ b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
@@ -306,6 +306,11 @@ public class PulsarSpoutTest extends ProducerConsumerBase {
otherSpout.close();
topicStats = admin.topics().getStats(topic);
+ Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
+
+ otherSpout.close();
+
+ topicStats = admin.topics().getStats(topic);
Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 0);
}