You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/12/03 22:11:43 UTC
[pulsar] branch master updated: Add pull-mode on Websockets (#3058)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 66a68d5 Add pull-mode on Websockets (#3058)
66a68d5 is described below
commit 66a68d58172ce06930ec84a9b0e41944a2c8e5b6
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Mon Dec 3 23:11:38 2018 +0100
Add pull-mode on Websockets (#3058)
* Add pull-mode on Websockets
Fix #3052
* Add WebSocket pull-mode test
* Add doc on WebSocket pull mode
---
.../websocket/proxy/ProxyPublishConsumeTest.java | 60 ++++++++++++++++++++++
.../websocket/proxy/SimpleConsumerSocket.java | 7 +++
.../apache/pulsar/websocket/ConsumerHandler.java | 55 +++++++++++++-------
.../{ConsumerAck.java => ConsumerCommand.java} | 11 ++--
site2/docs/client-libraries-websocket.md | 32 ++++++++++++
5 files changed, 138 insertions(+), 27 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 7f1b5aa..9ddf955 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -465,6 +465,66 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
}
}
+ @Test(timeOut = 10000)
+ public void socketPullModeTest() throws Exception {
+ final String topic = "my-property/my-ns/my-topic8";
+ final String subscription = "my-sub";
+ final String consumerUri = String.format(
+ "ws://localhost:%d/ws/v2/consumer/persistent/%s/%s?pullMode=true&subscriptionType=Shared",
+ port, topic, subscription
+ );
+ final String producerUri = String.format("ws://localhost:%d/ws/v2/producer/persistent/%s", port, topic);
+
+ URI consumeUri = URI.create(consumerUri);
+ URI produceUri = URI.create(producerUri);
+
+ WebSocketClient consumeClient1 = new WebSocketClient();
+ SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
+ WebSocketClient consumeClient2 = new WebSocketClient();
+ SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket();
+ WebSocketClient produceClient = new WebSocketClient();
+ SimpleProducerSocket produceSocket = new SimpleProducerSocket();
+
+ try {
+ consumeClient1.start();
+ consumeClient2.start();
+ ClientUpgradeRequest consumeRequest1 = new ClientUpgradeRequest();
+ ClientUpgradeRequest consumeRequest2 = new ClientUpgradeRequest();
+ Future<Session> consumerFuture1 = consumeClient1.connect(consumeSocket1, consumeUri, consumeRequest1);
+ Future<Session> consumerFuture2 = consumeClient2.connect(consumeSocket2, consumeUri, consumeRequest2);
+ log.info("Connecting to : {}", consumeUri);
+
+ // let it connect
+ Assert.assertTrue(consumerFuture1.get().isOpen());
+ Assert.assertTrue(consumerFuture2.get().isOpen());
+
+ ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+ produceClient.start();
+ Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
+ Assert.assertTrue(producerFuture.get().isOpen());
+ produceSocket.sendMessage(100);
+
+ Thread.sleep(500);
+
+ // Verify no messages received despite production
+ Assert.assertEquals(consumeSocket1.getReceivedMessagesCount(), 0);
+ Assert.assertEquals(consumeSocket2.getReceivedMessagesCount(), 0);
+
+ consumeSocket1.sendPermits(3);
+ consumeSocket2.sendPermits(2);
+ consumeSocket2.sendPermits(2);
+ consumeSocket2.sendPermits(2);
+
+ Thread.sleep(500);
+
+ Assert.assertEquals(consumeSocket1.getReceivedMessagesCount(), 3);
+ Assert.assertEquals(consumeSocket2.getReceivedMessagesCount(), 6);
+
+ } finally {
+ stopWebSocketClient(consumeClient1, consumeClient2, produceClient);
+ }
+ }
+
private void verifyTopicStat(Client client, String baseUrl, String topic) {
String statUrl = baseUrl + topic + "/stats";
WebTarget webTarget = client.target(statUrl);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
index 09b29c4..c303ed5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
@@ -81,6 +81,13 @@ public class SimpleConsumerSocket {
this.getRemote().sendString(ack.toString());
}
+ public void sendPermits(int nbPermits) throws IOException {
+ JsonObject permitMessage = new JsonObject();
+ permitMessage.add("type", new JsonPrimitive("permit"));
+ permitMessage.add("permitMessages", new JsonPrimitive(nbPermits));
+ this.getRemote().sendString(permitMessage.toString());
+ }
+
public RemoteEndpoint getRemote() {
return this.session.getRemote();
}
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index 4736cbe..d8993d1 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -47,7 +47,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.ObjectMapperFactory;
-import org.apache.pulsar.websocket.data.ConsumerAck;
+import org.apache.pulsar.websocket.data.ConsumerCommand;
import org.apache.pulsar.websocket.data.ConsumerMessage;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WriteCallback;
@@ -72,8 +72,9 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
private SubscriptionType subscriptionType;
private Consumer<byte[]> consumer;
- private int maxPendingMessages;
+ private int maxPendingMessages = 0;
private final AtomicInteger pendingMessages = new AtomicInteger();
+ private final boolean pullMode;
private final LongAdder numMsgsDelivered;
private final LongAdder numBytesDelivered;
@@ -90,13 +91,17 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
this.numMsgsDelivered = new LongAdder();
this.numBytesDelivered = new LongAdder();
this.numMsgsAcked = new LongAdder();
+ this.pullMode = Boolean.valueOf(queryParams.get("pullMode"));
try {
// checkAuth() and getConsumerConfiguration() should be called after assigning a value to this.subscription
this.subscription = extractSubscription(request);
builder = (ConsumerBuilderImpl<byte[]>) getConsumerConfiguration(service.getPulsarClient());
- this.maxPendingMessages = (builder.getConf().getReceiverQueueSize() == 0) ? 1
- : builder.getConf().getReceiverQueueSize();
+
+ if (!this.pullMode) {
+ this.maxPendingMessages = (builder.getConf().getReceiverQueueSize() == 0) ? 1
+ : builder.getConf().getReceiverQueueSize();
+ }
this.subscriptionType = builder.getConf().getSubscriptionType();
if (!checkAuth(response)) {
@@ -209,31 +214,43 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
@Override
public void onWebSocketConnect(Session session) {
super.onWebSocketConnect(session);
- receiveMessage();
+ if (!pullMode) {
+ receiveMessage();
+ }
}
@Override
public void onWebSocketText(String message) {
super.onWebSocketText(message);
- // We should have received an ack
-
- MessageId msgId;
try {
- ConsumerAck ack = ObjectMapperFactory.getThreadLocal().readValue(message, ConsumerAck.class);
- msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(ack.messageId), topic);
+ ConsumerCommand command = ObjectMapperFactory.getThreadLocal().readValue(message, ConsumerCommand.class);
+ if ("permit".equals(command.type)) {
+ if (command.permitMessages == null) {
+ throw new IOException("Missing required permitMessages field for 'permit' command");
+ }
+ if (this.pullMode) {
+ int pending = pendingMessages.getAndAdd(-command.permitMessages);
+ if (pending >= 0) {
+ // Resume delivery
+ receiveMessage();
+ }
+ }
+ } else {
+ // We should have received an ack
+ MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId), topic);
+ consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment());
+ if (!this.pullMode) {
+ int pending = pendingMessages.getAndDecrement();
+ if (pending >= maxPendingMessages) {
+ // Resume delivery
+ receiveMessage();
+ }
+ }
+ }
} catch (IOException e) {
log.warn("Failed to deserialize message id: {}", message, e);
close(WebSocketError.FailedToDeserializeFromJSON);
- return;
- }
-
- consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment());
-
- int pending = pendingMessages.getAndDecrement();
- if (pending >= maxPendingMessages) {
- // Resume delivery
- receiveMessage();
}
}
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerAck.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerCommand.java
similarity index 85%
rename from pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerAck.java
rename to pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerCommand.java
index a7493fb..88112a8 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerAck.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerCommand.java
@@ -18,13 +18,8 @@
*/
package org.apache.pulsar.websocket.data;
-public class ConsumerAck {
+public class ConsumerCommand {
+ public String type;
public String messageId;
-
- public ConsumerAck() {
- }
-
- public ConsumerAck(String messageId) {
- this.messageId = messageId;
- }
+ public Integer permitMessages;
}
diff --git a/site2/docs/client-libraries-websocket.md b/site2/docs/client-libraries-websocket.md
index 886604f..e2fbaf9 100644
--- a/site2/docs/client-libraries-websocket.md
+++ b/site2/docs/client-libraries-websocket.md
@@ -144,6 +144,11 @@ Key | Type | Required? | Explanation
`priorityLevel` | int | no | Define a [priority](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerConfiguration.html#setPriorityLevel-int-) for the consumer
`maxRedeliverCount` | int | no | Define a [maxRedeliverCount](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#deadLetterPolicy-org.apache.pulsar.client.api.DeadLetterPolicy-) for the consumer (default: 0). Activates [Dead Letter Topic](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic) feature.
`deadLetterTopic` | string | no | Define a [deadLetterTopic](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#deadLetterPolicy-org.apache.pulsar.client.api.DeadLetterPolicy-) for the consumer (default: {topic}-{subscription}-DLQ). Activates [Dead Letter Topic](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic) feature.
+`pullMode` | boolean | no | Enable pull mode (default: false). See "Flow Control" below.
+
+NB: these parameter (except `pullMode`) apply to the internal consumer of the WebSocket service.
+So messages will be subject to the redelivery settings as soon as the get into the receive queue,
+even if the client doesn't consume on the WebSocket.
##### Receiving messages
@@ -181,6 +186,33 @@ Key | Type | Required? | Explanation
:---|:-----|:----------|:-----------
`messageId`| string | yes | Message ID of the processed message
+#### Flow control
+
+##### Push Mode
+
+By default (`pullMode=false`), the consumer endpoint will use the `receiverQueueSize` parameter both to size its
+internal receive queue and to limit the number of unacknowledged messages that are passed to the WebSocket client.
+In this mode, if you don't send acknowledgements, the Pulsar WebSocket service will stop sending messages after reaching
+`receiverQueueSize` unacked messages sent to the WebSocket client.
+
+##### Pull Mode
+
+If you set `pullMode` to `true`, the WebSocket client will need to send `permit` commands to permit the
+Pulsar WebSocket service to send more messages.
+
+```json
+{
+ "type": "permit",
+ "permitMessages": 100
+}
+```
+
+Key | Type | Required? | Explanation
+:---|:-----|:----------|:-----------
+`type`| string | yes | Type of command. Must be `permit`
+`permitMessages`| int | yes | Number of messages to permit
+
+NB: in this mode it's possible to acknowledge messages in a different connection.
### Reader endpoint