You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/12/03 22:11:43 UTC

[pulsar] branch master updated: Add pull-mode on Websockets (#3058)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 66a68d5  Add pull-mode on Websockets (#3058)
66a68d5 is described below

commit 66a68d58172ce06930ec84a9b0e41944a2c8e5b6
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Mon Dec 3 23:11:38 2018 +0100

    Add pull-mode on Websockets (#3058)
    
    * Add pull-mode on Websockets
    
    Fix #3052
    
    * Add WebSocket pull-mode test
    
    * Add doc on WebSocket pull mode
---
 .../websocket/proxy/ProxyPublishConsumeTest.java   | 60 ++++++++++++++++++++++
 .../websocket/proxy/SimpleConsumerSocket.java      |  7 +++
 .../apache/pulsar/websocket/ConsumerHandler.java   | 55 +++++++++++++-------
 .../{ConsumerAck.java => ConsumerCommand.java}     | 11 ++--
 site2/docs/client-libraries-websocket.md           | 32 ++++++++++++
 5 files changed, 138 insertions(+), 27 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 7f1b5aa..9ddf955 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -465,6 +465,66 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
         }
     }
 
+    @Test(timeOut = 10000)
+    public void socketPullModeTest() throws Exception {
+        final String topic = "my-property/my-ns/my-topic8";
+        final String subscription = "my-sub";
+        final String consumerUri = String.format(
+                "ws://localhost:%d/ws/v2/consumer/persistent/%s/%s?pullMode=true&subscriptionType=Shared",
+                port, topic, subscription
+        );
+        final String producerUri = String.format("ws://localhost:%d/ws/v2/producer/persistent/%s", port, topic);
+
+        URI consumeUri = URI.create(consumerUri);
+        URI produceUri = URI.create(producerUri);
+
+        WebSocketClient consumeClient1 = new WebSocketClient();
+        SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
+        WebSocketClient consumeClient2 = new WebSocketClient();
+        SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket();
+        WebSocketClient produceClient = new WebSocketClient();
+        SimpleProducerSocket produceSocket = new SimpleProducerSocket();
+
+        try {
+            consumeClient1.start();
+            consumeClient2.start();
+            ClientUpgradeRequest consumeRequest1 = new ClientUpgradeRequest();
+            ClientUpgradeRequest consumeRequest2 = new ClientUpgradeRequest();
+            Future<Session> consumerFuture1 = consumeClient1.connect(consumeSocket1, consumeUri, consumeRequest1);
+            Future<Session> consumerFuture2 = consumeClient2.connect(consumeSocket2, consumeUri, consumeRequest2);
+            log.info("Connecting to : {}", consumeUri);
+
+            // let it connect
+            Assert.assertTrue(consumerFuture1.get().isOpen());
+            Assert.assertTrue(consumerFuture2.get().isOpen());
+
+            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+            produceClient.start();
+            Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
+            Assert.assertTrue(producerFuture.get().isOpen());
+            produceSocket.sendMessage(100);
+
+            Thread.sleep(500);
+
+            // Verify no messages received despite production
+            Assert.assertEquals(consumeSocket1.getReceivedMessagesCount(), 0);
+            Assert.assertEquals(consumeSocket2.getReceivedMessagesCount(), 0);
+
+            consumeSocket1.sendPermits(3);
+            consumeSocket2.sendPermits(2);
+            consumeSocket2.sendPermits(2);
+            consumeSocket2.sendPermits(2);
+
+            Thread.sleep(500);
+
+            Assert.assertEquals(consumeSocket1.getReceivedMessagesCount(), 3);
+            Assert.assertEquals(consumeSocket2.getReceivedMessagesCount(), 6);
+
+        } finally {
+            stopWebSocketClient(consumeClient1, consumeClient2, produceClient);
+        }
+    }
+
     private void verifyTopicStat(Client client, String baseUrl, String topic) {
         String statUrl = baseUrl + topic + "/stats";
         WebTarget webTarget = client.target(statUrl);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
index 09b29c4..c303ed5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
@@ -81,6 +81,13 @@ public class SimpleConsumerSocket {
         this.getRemote().sendString(ack.toString());
     }
 
+    public void sendPermits(int nbPermits) throws IOException {
+        JsonObject permitMessage = new JsonObject();
+        permitMessage.add("type", new JsonPrimitive("permit"));
+        permitMessage.add("permitMessages", new JsonPrimitive(nbPermits));
+        this.getRemote().sendString(permitMessage.toString());
+    }
+
     public RemoteEndpoint getRemote() {
         return this.session.getRemote();
     }
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index 4736cbe..d8993d1 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -47,7 +47,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
-import org.apache.pulsar.websocket.data.ConsumerAck;
+import org.apache.pulsar.websocket.data.ConsumerCommand;
 import org.apache.pulsar.websocket.data.ConsumerMessage;
 import org.eclipse.jetty.websocket.api.Session;
 import org.eclipse.jetty.websocket.api.WriteCallback;
@@ -72,8 +72,9 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
     private SubscriptionType subscriptionType;
     private Consumer<byte[]> consumer;
 
-    private int maxPendingMessages;
+    private int maxPendingMessages = 0;
     private final AtomicInteger pendingMessages = new AtomicInteger();
+    private final boolean pullMode;
 
     private final LongAdder numMsgsDelivered;
     private final LongAdder numBytesDelivered;
@@ -90,13 +91,17 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
         this.numMsgsDelivered = new LongAdder();
         this.numBytesDelivered = new LongAdder();
         this.numMsgsAcked = new LongAdder();
+        this.pullMode = Boolean.valueOf(queryParams.get("pullMode"));
 
         try {
             // checkAuth() and getConsumerConfiguration() should be called after assigning a value to this.subscription
             this.subscription = extractSubscription(request);
             builder = (ConsumerBuilderImpl<byte[]>) getConsumerConfiguration(service.getPulsarClient());
-            this.maxPendingMessages = (builder.getConf().getReceiverQueueSize() == 0) ? 1
-                    : builder.getConf().getReceiverQueueSize();
+
+            if (!this.pullMode) {
+                this.maxPendingMessages = (builder.getConf().getReceiverQueueSize() == 0) ? 1
+                        : builder.getConf().getReceiverQueueSize();
+            }
             this.subscriptionType = builder.getConf().getSubscriptionType();
 
             if (!checkAuth(response)) {
@@ -209,31 +214,43 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
     @Override
     public void onWebSocketConnect(Session session) {
         super.onWebSocketConnect(session);
-        receiveMessage();
+        if (!pullMode) {
+            receiveMessage();
+        }
     }
 
     @Override
     public void onWebSocketText(String message) {
         super.onWebSocketText(message);
 
-        // We should have received an ack
-
-        MessageId msgId;
         try {
-            ConsumerAck ack = ObjectMapperFactory.getThreadLocal().readValue(message, ConsumerAck.class);
-            msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(ack.messageId), topic);
+            ConsumerCommand command = ObjectMapperFactory.getThreadLocal().readValue(message, ConsumerCommand.class);
+            if ("permit".equals(command.type)) {
+                if (command.permitMessages == null) {
+                    throw new IOException("Missing required permitMessages field for 'permit' command");
+                }
+                if (this.pullMode) {
+                    int pending = pendingMessages.getAndAdd(-command.permitMessages);
+                    if (pending >= 0) {
+                        // Resume delivery
+                        receiveMessage();
+                    }
+                }
+            } else {
+                // We should have received an ack
+                MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId), topic);
+                consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment());
+                if (!this.pullMode) {
+                    int pending = pendingMessages.getAndDecrement();
+                    if (pending >= maxPendingMessages) {
+                        // Resume delivery
+                        receiveMessage();
+                    }
+                }
+            }
         } catch (IOException e) {
             log.warn("Failed to deserialize message id: {}", message, e);
             close(WebSocketError.FailedToDeserializeFromJSON);
-            return;
-        }
-
-        consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment());
-
-        int pending = pendingMessages.getAndDecrement();
-        if (pending >= maxPendingMessages) {
-            // Resume delivery
-            receiveMessage();
         }
     }
 
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerAck.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerCommand.java
similarity index 85%
rename from pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerAck.java
rename to pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerCommand.java
index a7493fb..88112a8 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerAck.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerCommand.java
@@ -18,13 +18,8 @@
  */
 package org.apache.pulsar.websocket.data;
 
-public class ConsumerAck {
+public class ConsumerCommand {
+    public String type;
     public String messageId;
-
-    public ConsumerAck() {
-    }
-
-    public ConsumerAck(String messageId) {
-        this.messageId = messageId;
-    }
+    public Integer permitMessages;
 }
diff --git a/site2/docs/client-libraries-websocket.md b/site2/docs/client-libraries-websocket.md
index 886604f..e2fbaf9 100644
--- a/site2/docs/client-libraries-websocket.md
+++ b/site2/docs/client-libraries-websocket.md
@@ -144,6 +144,11 @@ Key | Type | Required? | Explanation
 `priorityLevel` | int | no | Define a [priority](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerConfiguration.html#setPriorityLevel-int-) for the consumer
 `maxRedeliverCount` | int | no | Define a [maxRedeliverCount](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#deadLetterPolicy-org.apache.pulsar.client.api.DeadLetterPolicy-) for the consumer (default: 0). Activates [Dead Letter Topic](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic) feature.
 `deadLetterTopic` | string | no | Define a [deadLetterTopic](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#deadLetterPolicy-org.apache.pulsar.client.api.DeadLetterPolicy-) for the consumer (default: {topic}-{subscription}-DLQ). Activates [Dead Letter Topic](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic) feature.
+`pullMode` | boolean | no | Enable pull mode (default: false). See "Flow Control" below.
+
+NB: these parameter (except `pullMode`) apply to the internal consumer of the WebSocket service.
+So messages will be subject to the redelivery settings as soon as the get into the receive queue,
+even if the client doesn't consume on the WebSocket.
 
 ##### Receiving messages
 
@@ -181,6 +186,33 @@ Key | Type | Required? | Explanation
 :---|:-----|:----------|:-----------
 `messageId`| string | yes | Message ID of the processed message
 
+#### Flow control
+
+##### Push Mode
+
+By default (`pullMode=false`), the consumer endpoint will use the `receiverQueueSize` parameter both to size its
+internal receive queue and to limit the number of unacknowledged messages that are passed to the WebSocket client.
+In this mode, if you don't send acknowledgements, the Pulsar WebSocket service will stop sending messages after reaching
+`receiverQueueSize` unacked messages sent to the WebSocket client.
+
+##### Pull Mode
+
+If you set `pullMode` to `true`, the WebSocket client will need to send `permit` commands to permit the
+Pulsar WebSocket service to send more messages.
+
+```json
+{
+  "type": "permit",
+  "permitMessages": 100
+}
+```
+
+Key | Type | Required? | Explanation
+:---|:-----|:----------|:-----------
+`type`| string | yes | Type of command. Must be `permit`
+`permitMessages`| int | yes | Number of messages to permit
+
+NB: in this mode it's possible to acknowledge messages in a different connection.
 
 ### Reader endpoint