You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/09/24 09:29:18 UTC

[pulsar] branch master updated: [Issue 5005] WebSocket API: support unsubscribing from a topic (#8068)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ce8d9e0  [Issue 5005] WebSocket API: support unsubscribing from a topic (#8068)
ce8d9e0 is described below

commit ce8d9e04f12f1d092467cba58a750048640c355e
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Thu Sep 24 17:29:06 2020 +0800

    [Issue 5005] WebSocket API: support unsubscribing from a topic (#8068)
    
    Fixes #5005
    
    ### Motivation
    Websocket API doesn't support unsubscribing from a topic, which leaves many subscriptions around that need to be cleaned up by pulsar-admin.
---
 .../websocket/proxy/ProxyPublishConsumeTest.java   | 36 ++++++++++++++
 .../websocket/proxy/SimpleConsumerSocket.java      |  6 +++
 .../apache/pulsar/websocket/ConsumerHandler.java   | 57 ++++++++++++++--------
 3 files changed, 78 insertions(+), 21 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 26b12b8..2518333 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -66,6 +66,7 @@ import org.glassfish.jersey.client.ClientConfig;
 import org.glassfish.jersey.logging.LoggingFeature;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -181,6 +182,41 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
         }
     }
 
+    @Test
+    public void unsubscribeTest() throws Exception {
+        final String namespace = "my-property/my-ns";
+        final String topic = namespace + "/" + "my-topic7";
+        final String topicName = "persistent://" + topic + System.nanoTime();
+        admin.topics().createPartitionedTopic(topicName, 3);
+
+        final String subscription = "my-sub";
+        final String consumerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/" + topic + "/" + subscription;
+
+        URI consumeUri = URI.create(consumerUri);
+        WebSocketClient consumeClient = new WebSocketClient();
+        SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
+        Thread.sleep(500);
+
+        try {
+            // setup a consumer
+            consumeClient.start();
+            ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
+            Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
+            consumerFuture.get();
+            List<String> subs = admin.topics().getSubscriptions(topic);
+            Assert.assertEquals(subs.size(), 1);
+            Assert.assertEquals(subs.get(0), subscription);
+            // do unsubscribe
+            consumeSocket.unsubscribe();
+            //wait for delete
+            Thread.sleep(1000);
+            subs = admin.topics().getSubscriptions(topic);
+            Assert.assertEquals(subs.size(), 0);
+        } finally {
+            stopWebSocketClient(consumeClient);
+        }
+    }
+
     @Test(timeOut = 10000)
     public void emptySubcriptionConsumerTest() throws Exception {
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
index c303ed5..897c72d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
@@ -88,6 +88,12 @@ public class SimpleConsumerSocket {
         this.getRemote().sendString(permitMessage.toString());
     }
 
+    public void unsubscribe() throws IOException {
+        JsonObject message = new JsonObject();
+        message.add("type", new JsonPrimitive("unsubscribe"));
+        this.getRemote().sendString(message.toString());
+    }
+
     public RemoteEndpoint getRemote() {
         return this.session.getRemote();
     }
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index b92bf3c..35b2b0c 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
 import org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException;
 import org.apache.pulsar.client.api.SubscriptionMode;
@@ -230,28 +231,11 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
         try {
             ConsumerCommand command = ObjectMapperFactory.getThreadLocal().readValue(message, ConsumerCommand.class);
             if ("permit".equals(command.type)) {
-                if (command.permitMessages == null) {
-                    throw new IOException("Missing required permitMessages field for 'permit' command");
-                }
-                if (this.pullMode) {
-                    int pending = pendingMessages.getAndAdd(-command.permitMessages);
-                    if (pending >= 0) {
-                        // Resume delivery
-                        receiveMessage();
-                    }
-                }
+                handlePermit(command);
+            } else if ("unsubscribe".equals(command.type)) {
+                handleUnsubscribe(command);
             } else {
-                // We should have received an ack
-                MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId),
-                        topic.toString());
-                consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment());
-                if (!this.pullMode) {
-                    int pending = pendingMessages.getAndDecrement();
-                    if (pending >= maxPendingMessages) {
-                        // Resume delivery
-                        receiveMessage();
-                    }
-                }
+                handleAck(command);
             }
         } catch (IOException e) {
             log.warn("Failed to deserialize message id: {}", message, e);
@@ -259,6 +243,37 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
         }
     }
 
+    private void handleUnsubscribe(ConsumerCommand command) throws PulsarClientException {
+        consumer.unsubscribe();
+    }
+
+    private void handleAck(ConsumerCommand command) throws IOException {
+        // We should have received an ack
+        MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId),
+                topic.toString());
+        consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment());
+        if (!this.pullMode) {
+            int pending = pendingMessages.getAndDecrement();
+            if (pending >= maxPendingMessages) {
+                // Resume delivery
+                receiveMessage();
+            }
+        }
+    }
+
+    private void handlePermit(ConsumerCommand command) throws IOException {
+        if (command.permitMessages == null) {
+            throw new IOException("Missing required permitMessages field for 'permit' command");
+        }
+        if (this.pullMode) {
+            int pending = pendingMessages.getAndAdd(-command.permitMessages);
+            if (pending >= 0) {
+                // Resume delivery
+                receiveMessage();
+            }
+        }
+    }
+
     @Override
     public void close() throws IOException {
         if (consumer != null) {