You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/05/23 22:53:43 UTC
[incubator-pulsar] branch master updated: Remove reference to
consumers in client map when unsubscribing (#1836)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3988ca3 Remove reference to consumers in client map when unsubscribing (#1836)
3988ca3 is described below
commit 3988ca394beacc6dd417b40f670213ba27078d6b
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed May 23 15:53:40 2018 -0700
Remove reference to consumers in client map when unsubscribing (#1836)
---
.../client/impl/ConsumerUnsubscribeTest.java | 64 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 5 +-
.../pulsar/client/impl/PulsarClientImpl.java | 15 +++++
3 files changed, 82 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerUnsubscribeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerUnsubscribeTest.java
new file mode 100644
index 0000000..9061da0
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerUnsubscribeTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MockBrokerService;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ */
+public class ConsumerUnsubscribeTest {
+
+ MockBrokerService mockBrokerService;
+ private static final int WEB_SERVICE_PORT = PortManager.nextFreePort();
+ private static final int WEB_SERVICE_TLS_PORT = PortManager.nextFreePort();
+ private static final int BROKER_SERVICE_PORT = PortManager.nextFreePort();
+ private static final int BROKER_SERVICE_TLS_PORT = PortManager.nextFreePort();
+
+ @BeforeClass
+ public void setup() {
+ mockBrokerService = new MockBrokerService(WEB_SERVICE_PORT, WEB_SERVICE_TLS_PORT, BROKER_SERVICE_PORT,
+ BROKER_SERVICE_TLS_PORT);
+ mockBrokerService.start();
+ }
+
+ @AfterClass
+ public void teardown() {
+ mockBrokerService.stop();
+ }
+
+ @Test
+ public void testConsumerUnsubscribeReference() throws Exception {
+ PulsarClientImpl client = (PulsarClientImpl) PulsarClient.builder()
+ .serviceUrl("pulsar://127.0.0.1:" + BROKER_SERVICE_PORT).build();
+
+ Consumer<?> consumer = client.newConsumer().topic("persistent://public/default/t1").subscriptionName("sub1").subscribe();
+ consumer.unsubscribe();
+
+ assertEquals(client.consumersCount(), 0);
+ client.close();
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 38950b6..056b17e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -213,8 +213,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
ClientCnx cnx = cnx();
cnx.sendRequestWithId(unsubscribe, requestId).thenRun(() -> {
cnx.removeConsumer(consumerId);
- log.info("[{}][{}] Successfully unsubscribed from topic", topic, subscription);
unAckedMessageTracker.close();
+ client.cleanupConsumer(ConsumerImpl.this);
+ log.info("[{}][{}] Successfully unsubscribed from topic", topic, subscription);
unsubscribeFuture.complete(null);
setState(State.Closed);
}).exceptionally(e -> {
@@ -1350,7 +1351,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
void connectionClosed(ClientCnx cnx) {
this.connectionHandler.connectionClosed(cnx);
}
-
+
@VisibleForTesting
public ClientCnx getClientCnx() {
return this.connectionHandler.getClientCnx();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 5360a8a..c504ace 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl;
import static org.apache.commons.lang3.StringUtils.isBlank;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.netty.channel.EventLoopGroup;
@@ -698,4 +699,18 @@ public class PulsarClientImpl implements PulsarClient {
consumers.remove(consumer);
}
}
+
+ @VisibleForTesting
+ int producersCount() {
+ synchronized (producers) {
+ return producers.size();
+ }
+ }
+
+ @VisibleForTesting
+ int consumersCount() {
+ synchronized (consumers) {
+ return consumers.size();
+ }
+ }
}
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.