You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/09/24 09:29:18 UTC
[pulsar] branch master updated: [Issue 5005] WebSocket API: support
unsubscribing from a topic (#8068)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ce8d9e0 [Issue 5005] WebSocket API: support unsubscribing from a topic (#8068)
ce8d9e0 is described below
commit ce8d9e04f12f1d092467cba58a750048640c355e
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Thu Sep 24 17:29:06 2020 +0800
[Issue 5005] WebSocket API: support unsubscribing from a topic (#8068)
Fixes #5005
### Motivation
Websocket API doesn't support unsubscribing from a topic, which leaves many subscriptions around that need to be cleaned up by pulsar-admin.
---
.../websocket/proxy/ProxyPublishConsumeTest.java | 36 ++++++++++++++
.../websocket/proxy/SimpleConsumerSocket.java | 6 +++
.../apache/pulsar/websocket/ConsumerHandler.java | 57 ++++++++++++++--------
3 files changed, 78 insertions(+), 21 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 26b12b8..2518333 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -66,6 +66,7 @@ import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.logging.LoggingFeature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -181,6 +182,41 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
}
}
+ @Test
+ public void unsubscribeTest() throws Exception {
+ final String namespace = "my-property/my-ns";
+ final String topic = namespace + "/" + "my-topic7";
+ final String topicName = "persistent://" + topic + System.nanoTime();
+ admin.topics().createPartitionedTopic(topicName, 3);
+
+ final String subscription = "my-sub";
+ final String consumerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/" + topic + "/" + subscription;
+
+ URI consumeUri = URI.create(consumerUri);
+ WebSocketClient consumeClient = new WebSocketClient();
+ SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
+ Thread.sleep(500);
+
+ try {
+ // setup a consumer
+ consumeClient.start();
+ ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
+ Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
+ consumerFuture.get();
+ List<String> subs = admin.topics().getSubscriptions(topic);
+ Assert.assertEquals(subs.size(), 1);
+ Assert.assertEquals(subs.get(0), subscription);
+ // do unsubscribe
+ consumeSocket.unsubscribe();
+ //wait for delete
+ Thread.sleep(1000);
+ subs = admin.topics().getSubscriptions(topic);
+ Assert.assertEquals(subs.size(), 0);
+ } finally {
+ stopWebSocketClient(consumeClient);
+ }
+ }
+
@Test(timeOut = 10000)
public void emptySubcriptionConsumerTest() throws Exception {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
index c303ed5..897c72d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
@@ -88,6 +88,12 @@ public class SimpleConsumerSocket {
this.getRemote().sendString(permitMessage.toString());
}
+ public void unsubscribe() throws IOException {
+ JsonObject message = new JsonObject();
+ message.add("type", new JsonPrimitive("unsubscribe"));
+ this.getRemote().sendString(message.toString());
+ }
+
public RemoteEndpoint getRemote() {
return this.session.getRemote();
}
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index b92bf3c..35b2b0c 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
import org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException;
import org.apache.pulsar.client.api.SubscriptionMode;
@@ -230,28 +231,11 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
try {
ConsumerCommand command = ObjectMapperFactory.getThreadLocal().readValue(message, ConsumerCommand.class);
if ("permit".equals(command.type)) {
- if (command.permitMessages == null) {
- throw new IOException("Missing required permitMessages field for 'permit' command");
- }
- if (this.pullMode) {
- int pending = pendingMessages.getAndAdd(-command.permitMessages);
- if (pending >= 0) {
- // Resume delivery
- receiveMessage();
- }
- }
+ handlePermit(command);
+ } else if ("unsubscribe".equals(command.type)) {
+ handleUnsubscribe(command);
} else {
- // We should have received an ack
- MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId),
- topic.toString());
- consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment());
- if (!this.pullMode) {
- int pending = pendingMessages.getAndDecrement();
- if (pending >= maxPendingMessages) {
- // Resume delivery
- receiveMessage();
- }
- }
+ handleAck(command);
}
} catch (IOException e) {
log.warn("Failed to deserialize message id: {}", message, e);
@@ -259,6 +243,37 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
}
}
+ private void handleUnsubscribe(ConsumerCommand command) throws PulsarClientException {
+ consumer.unsubscribe();
+ }
+
+ private void handleAck(ConsumerCommand command) throws IOException {
+ // We should have received an ack
+ MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId),
+ topic.toString());
+ consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment());
+ if (!this.pullMode) {
+ int pending = pendingMessages.getAndDecrement();
+ if (pending >= maxPendingMessages) {
+ // Resume delivery
+ receiveMessage();
+ }
+ }
+ }
+
+ private void handlePermit(ConsumerCommand command) throws IOException {
+ if (command.permitMessages == null) {
+ throw new IOException("Missing required permitMessages field for 'permit' command");
+ }
+ if (this.pullMode) {
+ int pending = pendingMessages.getAndAdd(-command.permitMessages);
+ if (pending >= 0) {
+ // Resume delivery
+ receiveMessage();
+ }
+ }
+ }
+
@Override
public void close() throws IOException {
if (consumer != null) {