You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/12 04:43:41 UTC

[pulsar] branch master updated: Feature - implement reference count for ConsumerImpl (#3795)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ff4db8d  Feature - implement reference count for ConsumerImpl (#3795)
ff4db8d is described below

commit ff4db8db12be2eb79d910e2b286306298f71320e
Author: Ezequiel Lovelle <ez...@gmail.com>
AuthorDate: Tue Mar 12 01:43:36 2019 -0300

    Feature - implement reference count for ConsumerImpl (#3795)
    
    * Feature - implement reference count for ConsumerImpl
    
    Add reference count for ConsumerImpl in order to track reused instances of a
    consumer instance returned by `subscribe()` method call.
    Having the reference of subscribed consumer instances offers the ability to not
    close a consumer until the last corresponding `close()` is being called.
    
    Modifications:
    
      - Add field on ConsumerBase to track references of consumer instances
        subscribed by the user.
      - Add checks on ConsumerImpl to know whether close() action should be
        performed regarding of reference count being zero value.
      - Increment reference count when a previous built consumer instance is being
        used by caller.
    
    Future considerations:
    
    When optimization #3312 is going to be made for other consumers implementation
    aside from ConsumerImpl it should add refCount checks on close() method call.
    
    * Add tests for reference count on ConsumerImpl
    
      - Add test to verify ConsumerImpl reference count on close() method.
      - Fix test from dup consumers feature with refcount.
---
 .../client/api/SimpleProducerConsumerTest.java     | 27 ++++++++++++++++++++--
 .../apache/pulsar/client/impl/ConsumerBase.java    | 10 +++++++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  4 ++++
 .../pulsar/client/impl/PulsarClientImpl.java       |  1 +
 .../org/apache/pulsar/storm/PulsarSpoutTest.java   |  5 ++++
 5 files changed, 44 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 0f3a5b6..06dacd8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.client.api;
 
-import static org.junit.Assert.assertNotNull;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.spy;
@@ -65,7 +64,6 @@ import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageCrypto;
 import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -3076,4 +3074,29 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         Assert.assertTrue(consumerC.isConnected());
         consumerC.close();
     }
+
+    @Test
+    public void testRefCount_OnCloseConsumer() throws Exception {
+        final String topic = "persistent://my-property/my-ns/my-topic";
+        final String subName = "my-subscription";
+
+        Consumer<byte[]> consumerA = pulsarClient.newConsumer().topic(topic)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+        Consumer<byte[]> consumerB = pulsarClient.newConsumer().topic(topic)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        Assert.assertEquals(consumerA, consumerB);
+
+        consumerA.close();
+        Assert.assertTrue(consumerA.isConnected());
+        Assert.assertTrue(consumerB.isConnected());
+
+        consumerB.close();
+        Assert.assertFalse(consumerA.isConnected());
+        Assert.assertFalse(consumerB.isConnected());
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 770ff2b..11d7543 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -59,8 +59,9 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     final BlockingQueue<Message<T>> incomingMessages;
     protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives;
     protected int maxReceiverQueueSize;
-    protected Schema<T> schema;
+    protected final Schema<T> schema;
     protected final ConsumerInterceptors<T> interceptors;
+    private int refCount = 0;
 
     protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
                            int receiverQueueSize, ExecutorService listenerExecutor,
@@ -385,4 +386,11 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         }
     }
 
+    protected synchronized void incrRefCount() {
+        ++refCount;
+    }
+
+    protected synchronized boolean shouldTearDown() {
+        return refCount > 0 ? refCount-- == 0 : refCount == 0;
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b75388a..499562f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -624,6 +624,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
     @Override
     public CompletableFuture<Void> closeAsync() {
+        if (!shouldTearDown()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
         if (getState() == State.Closing || getState() == State.Closed) {
             unAckedMessageTracker.close();
             if (possibleSendToDeadLetterTopicMessages != null) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 257e627..2e92c50 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -684,6 +684,7 @@ public class PulsarClientImpl implements PulsarClient {
                     .filter(c -> c.getSubscription().equals(conf.getSubscriptionName()))
                     .filter(Consumer::isConnected)
                     .findFirst();
+            subscriber.ifPresent(ConsumerBase::incrRefCount);
             return subscriber.map(ConsumerBase.class::cast);
         }
     }
diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
index 461a6d7..d8aa539 100644
--- a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
+++ b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
@@ -306,6 +306,11 @@ public class PulsarSpoutTest extends ProducerConsumerBase {
         otherSpout.close();
 
         topicStats = admin.topics().getStats(topic);
+        Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
+
+        otherSpout.close();
+
+        topicStats = admin.topics().getStats(topic);
         Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 0);
     }