You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/12/03 22:11:41 UTC

[GitHub] merlimat closed pull request #3058: Add pull-mode on Websockets

merlimat closed pull request #3058: Add pull-mode on Websockets
URL: https://github.com/apache/pulsar/pull/3058
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 7f1b5aa45c..9ddf955484 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -465,6 +465,66 @@ public void consumeMessagesInPartitionedTopicTest() throws Exception {
         }
     }
 
+    @Test(timeOut = 10000)
+    public void socketPullModeTest() throws Exception {
+        final String topic = "my-property/my-ns/my-topic8";
+        final String subscription = "my-sub";
+        final String consumerUri = String.format(
+                "ws://localhost:%d/ws/v2/consumer/persistent/%s/%s?pullMode=true&subscriptionType=Shared",
+                port, topic, subscription
+        );
+        final String producerUri = String.format("ws://localhost:%d/ws/v2/producer/persistent/%s", port, topic);
+
+        URI consumeUri = URI.create(consumerUri);
+        URI produceUri = URI.create(producerUri);
+
+        WebSocketClient consumeClient1 = new WebSocketClient();
+        SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
+        WebSocketClient consumeClient2 = new WebSocketClient();
+        SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket();
+        WebSocketClient produceClient = new WebSocketClient();
+        SimpleProducerSocket produceSocket = new SimpleProducerSocket();
+
+        try {
+            consumeClient1.start();
+            consumeClient2.start();
+            ClientUpgradeRequest consumeRequest1 = new ClientUpgradeRequest();
+            ClientUpgradeRequest consumeRequest2 = new ClientUpgradeRequest();
+            Future<Session> consumerFuture1 = consumeClient1.connect(consumeSocket1, consumeUri, consumeRequest1);
+            Future<Session> consumerFuture2 = consumeClient2.connect(consumeSocket2, consumeUri, consumeRequest2);
+            log.info("Connecting to : {}", consumeUri);
+
+            // let it connect
+            Assert.assertTrue(consumerFuture1.get().isOpen());
+            Assert.assertTrue(consumerFuture2.get().isOpen());
+
+            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+            produceClient.start();
+            Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
+            Assert.assertTrue(producerFuture.get().isOpen());
+            produceSocket.sendMessage(100);
+
+            Thread.sleep(500);
+
+            // Verify no messages received despite production
+            Assert.assertEquals(consumeSocket1.getReceivedMessagesCount(), 0);
+            Assert.assertEquals(consumeSocket2.getReceivedMessagesCount(), 0);
+
+            consumeSocket1.sendPermits(3);
+            consumeSocket2.sendPermits(2);
+            consumeSocket2.sendPermits(2);
+            consumeSocket2.sendPermits(2);
+
+            Thread.sleep(500);
+
+            Assert.assertEquals(consumeSocket1.getReceivedMessagesCount(), 3);
+            Assert.assertEquals(consumeSocket2.getReceivedMessagesCount(), 6);
+
+        } finally {
+            stopWebSocketClient(consumeClient1, consumeClient2, produceClient);
+        }
+    }
+
     private void verifyTopicStat(Client client, String baseUrl, String topic) {
         String statUrl = baseUrl + topic + "/stats";
         WebTarget webTarget = client.target(statUrl);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
index 09b29c47db..c303ed50c5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
@@ -81,6 +81,13 @@ public synchronized void onMessage(String msg) throws JsonParseException, IOExce
         this.getRemote().sendString(ack.toString());
     }
 
+    public void sendPermits(int nbPermits) throws IOException {
+        JsonObject permitMessage = new JsonObject();
+        permitMessage.add("type", new JsonPrimitive("permit"));
+        permitMessage.add("permitMessages", new JsonPrimitive(nbPermits));
+        this.getRemote().sendString(permitMessage.toString());
+    }
+
     public RemoteEndpoint getRemote() {
         return this.session.getRemote();
     }
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index 4736cbe602..d8993d1773 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -47,7 +47,7 @@
 import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
-import org.apache.pulsar.websocket.data.ConsumerAck;
+import org.apache.pulsar.websocket.data.ConsumerCommand;
 import org.apache.pulsar.websocket.data.ConsumerMessage;
 import org.eclipse.jetty.websocket.api.Session;
 import org.eclipse.jetty.websocket.api.WriteCallback;
@@ -72,8 +72,9 @@
     private SubscriptionType subscriptionType;
     private Consumer<byte[]> consumer;
 
-    private int maxPendingMessages;
+    private int maxPendingMessages = 0;
     private final AtomicInteger pendingMessages = new AtomicInteger();
+    private final boolean pullMode;
 
     private final LongAdder numMsgsDelivered;
     private final LongAdder numBytesDelivered;
@@ -90,13 +91,17 @@ public ConsumerHandler(WebSocketService service, HttpServletRequest request, Ser
         this.numMsgsDelivered = new LongAdder();
         this.numBytesDelivered = new LongAdder();
         this.numMsgsAcked = new LongAdder();
+        this.pullMode = Boolean.valueOf(queryParams.get("pullMode"));
 
         try {
             // checkAuth() and getConsumerConfiguration() should be called after assigning a value to this.subscription
             this.subscription = extractSubscription(request);
             builder = (ConsumerBuilderImpl<byte[]>) getConsumerConfiguration(service.getPulsarClient());
-            this.maxPendingMessages = (builder.getConf().getReceiverQueueSize() == 0) ? 1
-                    : builder.getConf().getReceiverQueueSize();
+
+            if (!this.pullMode) {
+                this.maxPendingMessages = (builder.getConf().getReceiverQueueSize() == 0) ? 1
+                        : builder.getConf().getReceiverQueueSize();
+            }
             this.subscriptionType = builder.getConf().getSubscriptionType();
 
             if (!checkAuth(response)) {
@@ -209,31 +214,43 @@ public void writeSuccess() {
     @Override
     public void onWebSocketConnect(Session session) {
         super.onWebSocketConnect(session);
-        receiveMessage();
+        if (!pullMode) {
+            receiveMessage();
+        }
     }
 
     @Override
     public void onWebSocketText(String message) {
         super.onWebSocketText(message);
 
-        // We should have received an ack
-
-        MessageId msgId;
         try {
-            ConsumerAck ack = ObjectMapperFactory.getThreadLocal().readValue(message, ConsumerAck.class);
-            msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(ack.messageId), topic);
+            ConsumerCommand command = ObjectMapperFactory.getThreadLocal().readValue(message, ConsumerCommand.class);
+            if ("permit".equals(command.type)) {
+                if (command.permitMessages == null) {
+                    throw new IOException("Missing required permitMessages field for 'permit' command");
+                }
+                if (this.pullMode) {
+                    int pending = pendingMessages.getAndAdd(-command.permitMessages);
+                    if (pending >= 0) {
+                        // Resume delivery
+                        receiveMessage();
+                    }
+                }
+            } else {
+                // We should have received an ack
+                MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId), topic);
+                consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment());
+                if (!this.pullMode) {
+                    int pending = pendingMessages.getAndDecrement();
+                    if (pending >= maxPendingMessages) {
+                        // Resume delivery
+                        receiveMessage();
+                    }
+                }
+            }
         } catch (IOException e) {
             log.warn("Failed to deserialize message id: {}", message, e);
             close(WebSocketError.FailedToDeserializeFromJSON);
-            return;
-        }
-
-        consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment());
-
-        int pending = pendingMessages.getAndDecrement();
-        if (pending >= maxPendingMessages) {
-            // Resume delivery
-            receiveMessage();
         }
     }
 
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerAck.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerCommand.java
similarity index 85%
rename from pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerAck.java
rename to pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerCommand.java
index a7493fb001..88112a8dc5 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerAck.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerCommand.java
@@ -18,13 +18,8 @@
  */
 package org.apache.pulsar.websocket.data;
 
-public class ConsumerAck {
+public class ConsumerCommand {
+    public String type;
     public String messageId;
-
-    public ConsumerAck() {
-    }
-
-    public ConsumerAck(String messageId) {
-        this.messageId = messageId;
-    }
+    public Integer permitMessages;
 }
diff --git a/site2/docs/client-libraries-websocket.md b/site2/docs/client-libraries-websocket.md
index 886604ff68..e2fbaf9d70 100644
--- a/site2/docs/client-libraries-websocket.md
+++ b/site2/docs/client-libraries-websocket.md
@@ -144,6 +144,11 @@ Key | Type | Required? | Explanation
 `priorityLevel` | int | no | Define a [priority](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerConfiguration.html#setPriorityLevel-int-) for the consumer
 `maxRedeliverCount` | int | no | Define a [maxRedeliverCount](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#deadLetterPolicy-org.apache.pulsar.client.api.DeadLetterPolicy-) for the consumer (default: 0). Activates [Dead Letter Topic](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic) feature.
 `deadLetterTopic` | string | no | Define a [deadLetterTopic](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#deadLetterPolicy-org.apache.pulsar.client.api.DeadLetterPolicy-) for the consumer (default: {topic}-{subscription}-DLQ). Activates [Dead Letter Topic](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic) feature.
+`pullMode` | boolean | no | Enable pull mode (default: false). See "Flow Control" below.
+
+NB: these parameter (except `pullMode`) apply to the internal consumer of the WebSocket service.
+So messages will be subject to the redelivery settings as soon as the get into the receive queue,
+even if the client doesn't consume on the WebSocket.
 
 ##### Receiving messages
 
@@ -181,6 +186,33 @@ Key | Type | Required? | Explanation
 :---|:-----|:----------|:-----------
 `messageId`| string | yes | Message ID of the processed message
 
+#### Flow control
+
+##### Push Mode
+
+By default (`pullMode=false`), the consumer endpoint will use the `receiverQueueSize` parameter both to size its
+internal receive queue and to limit the number of unacknowledged messages that are passed to the WebSocket client.
+In this mode, if you don't send acknowledgements, the Pulsar WebSocket service will stop sending messages after reaching
+`receiverQueueSize` unacked messages sent to the WebSocket client.
+
+##### Pull Mode
+
+If you set `pullMode` to `true`, the WebSocket client will need to send `permit` commands to permit the
+Pulsar WebSocket service to send more messages.
+
+```json
+{
+  "type": "permit",
+  "permitMessages": 100
+}
+```
+
+Key | Type | Required? | Explanation
+:---|:-----|:----------|:-----------
+`type`| string | yes | Type of command. Must be `permit`
+`permitMessages`| int | yes | Number of messages to permit
+
+NB: in this mode it's possible to acknowledge messages in a different connection.
 
 ### Reader endpoint
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services