You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/05/23 22:53:43 UTC

[incubator-pulsar] branch master updated: Remove reference to consumers in client map when unsubscribing (#1836)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3988ca3  Remove reference to consumers in client map when unsubscribing (#1836)
3988ca3 is described below

commit 3988ca394beacc6dd417b40f670213ba27078d6b
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed May 23 15:53:40 2018 -0700

    Remove reference to consumers in client map when unsubscribing (#1836)
---
 .../client/impl/ConsumerUnsubscribeTest.java       | 64 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  5 +-
 .../pulsar/client/impl/PulsarClientImpl.java       | 15 +++++
 3 files changed, 82 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerUnsubscribeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerUnsubscribeTest.java
new file mode 100644
index 0000000..9061da0
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerUnsubscribeTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MockBrokerService;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ */
+public class ConsumerUnsubscribeTest {
+
+    MockBrokerService mockBrokerService;
+    private static final int WEB_SERVICE_PORT = PortManager.nextFreePort();
+    private static final int WEB_SERVICE_TLS_PORT = PortManager.nextFreePort();
+    private static final int BROKER_SERVICE_PORT = PortManager.nextFreePort();
+    private static final int BROKER_SERVICE_TLS_PORT = PortManager.nextFreePort();
+
+    @BeforeClass
+    public void setup() {
+        mockBrokerService = new MockBrokerService(WEB_SERVICE_PORT, WEB_SERVICE_TLS_PORT, BROKER_SERVICE_PORT,
+                BROKER_SERVICE_TLS_PORT);
+        mockBrokerService.start();
+    }
+
+    @AfterClass
+    public void teardown() {
+        mockBrokerService.stop();
+    }
+
+    @Test
+    public void testConsumerUnsubscribeReference() throws Exception {
+        PulsarClientImpl client = (PulsarClientImpl) PulsarClient.builder()
+                .serviceUrl("pulsar://127.0.0.1:" + BROKER_SERVICE_PORT).build();
+
+        Consumer<?> consumer = client.newConsumer().topic("persistent://public/default/t1").subscriptionName("sub1").subscribe();
+        consumer.unsubscribe();
+
+        assertEquals(client.consumersCount(), 0);
+        client.close();
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 38950b6..056b17e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -213,8 +213,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             ClientCnx cnx = cnx();
             cnx.sendRequestWithId(unsubscribe, requestId).thenRun(() -> {
                 cnx.removeConsumer(consumerId);
-                log.info("[{}][{}] Successfully unsubscribed from topic", topic, subscription);
                 unAckedMessageTracker.close();
+                client.cleanupConsumer(ConsumerImpl.this);
+                log.info("[{}][{}] Successfully unsubscribed from topic", topic, subscription);
                 unsubscribeFuture.complete(null);
                 setState(State.Closed);
             }).exceptionally(e -> {
@@ -1350,7 +1351,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     void connectionClosed(ClientCnx cnx) {
         this.connectionHandler.connectionClosed(cnx);
     }
-    
+
     @VisibleForTesting
     public ClientCnx getClientCnx() {
         return this.connectionHandler.getClientCnx();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 5360a8a..c504ace 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import io.netty.channel.EventLoopGroup;
@@ -698,4 +699,18 @@ public class PulsarClientImpl implements PulsarClient {
             consumers.remove(consumer);
         }
     }
+
+    @VisibleForTesting
+    int producersCount() {
+        synchronized (producers) {
+            return producers.size();
+        }
+    }
+
+    @VisibleForTesting
+    int consumersCount() {
+        synchronized (consumers) {
+            return consumers.size();
+        }
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.