You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/08/04 13:02:17 UTC
[pulsar] 08/09: [websocket] Query parameter
"negativeAckRedeliveryDelay" should be effective even if DLQ is disabled
(#11495)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9917785902c19dc37f54ffae070887a6f9b86ac4
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Wed Aug 4 14:10:38 2021 +0900
[websocket] Query parameter "negativeAckRedeliveryDelay" should be effective even if DLQ is disabled (#11495)
### Motivation
On the consumer endpoint of WebSocket API, we can specify the delay time before a message which is negatively acknowledged is redelivered using the query parameter `negativeAckRedeliveryDelay`.
However, this parameter is currently ignored when DLQ is disabled. I think this is an implementation mistake. Users should be able to specify `negativeAckRedeliveryDelay` even if DLQ is disabled.
https://github.com/apache/pulsar/blob/ee202d06548e3c73d70ad52374658ee3507ca809/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java#L389-L403
Related PR: https://github.com/apache/pulsar/pull/8249
### Modifications
Fixed `ConsumerHandler` of WebSocket to use the `negativeAckRedeliveryDelay` value specified by the client even if DLQ is disabled. In addition, fixed an inappropriate test code (`ProxyPublishConsumeTest#nackMessageTest()`).
(cherry picked from commit 1f76d0de94eda94b91648f6235acde6ebd02d43a)
---
.../websocket/proxy/ProxyPublishConsumeTest.java | 77 ++++++++++++++++++----
.../websocket/proxy/SimpleProducerSocket.java | 10 ++-
.../apache/pulsar/websocket/ConsumerHandler.java | 8 ++-
site2/docs/client-libraries-websocket.md | 2 +-
4 files changed, 79 insertions(+), 18 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 2019a9e..941e410 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -38,6 +38,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
+import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -769,11 +770,11 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
}
}
- @Test(timeOut = 10000)
+ @Test(timeOut = 20000)
public void nackMessageTest() throws Exception {
final String subscription = "my-sub";
- final String dlqTopic = "my-property/my-ns/my-topic10";
- final String consumerTopic = "my-property/my-ns/my-topic9";
+ final String dlqTopic = "my-property/my-ns/nack-msg-dlq-" + UUID.randomUUID();
+ final String consumerTopic = "my-property/my-ns/nack-msg-" + UUID.randomUUID();
final String dlqUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() +
"/ws/v2/consumer/persistent/" +
@@ -784,7 +785,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
"/ws/v2/consumer/persistent/" +
consumerTopic + "/" + subscription +
"?deadLetterTopic=" + dlqTopic +
- "&maxRedeliverCount=0&subscriptionType=Shared&ackTimeoutMillis=1000&negativeAckRedeliveryDelay=1000";
+ "&maxRedeliverCount=1&subscriptionType=Shared&negativeAckRedeliveryDelay=1000";
final String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() +
"/ws/v2/producer/persistent/" + consumerTopic;
@@ -794,7 +795,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
WebSocketClient consumeClient2 = new WebSocketClient();
SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket();
WebSocketClient produceClient = new WebSocketClient();
- SimpleProducerSocket produceSocket = new SimpleProducerSocket();
+ SimpleProducerSocket produceSocket = new SimpleProducerSocket(0);
consumeSocket1.setMessageHandler((id, data) -> {
JsonObject nack = new JsonObject();
@@ -824,18 +825,70 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
produceSocket.sendMessage(1);
- Thread.sleep(500);
+ // Main topic
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals(consumeSocket1.getReceivedMessagesCount(), 2));
- //assertEquals(consumeSocket1.getReceivedMessagesCount(), 1);
- assertTrue(consumeSocket1.getReceivedMessagesCount() > 0);
+ // DLQ
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals(consumeSocket2.getReceivedMessagesCount(), 1));
+ } finally {
+ stopWebSocketClient(consumeClient1, consumeClient2, produceClient);
+ }
+ }
- Thread.sleep(500);
+ @Test(timeOut = 20000)
+ public void nackRedeliveryDelayTest() throws Exception {
+ final String uriBase = "ws://localhost:" + proxyServer.getListenPortHTTP().get() + "/ws/v2";
+ final String topic = "my-property/my-ns/nack-redelivery-delay-" + UUID.randomUUID();
+ final String sub = "my-sub";
+ final int delayTime = 5000;
+
+ final String consumerUri = String.format("%s/consumer/persistent/%s/%s?negativeAckRedeliveryDelay=%d", uriBase,
+ topic, sub, delayTime);
+
+ final String producerUri = String.format("%s/producer/persistent/%s", uriBase, topic);
+
+ final WebSocketClient consumeClient = new WebSocketClient();
+ final SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
+
+ final WebSocketClient produceClient = new WebSocketClient();
+ final SimpleProducerSocket produceSocket = new SimpleProducerSocket(0);
+
+ consumeSocket.setMessageHandler((mid, data) -> {
+ JsonObject nack = new JsonObject();
+ nack.add("type", new JsonPrimitive("negativeAcknowledge"));
+ nack.add("messageId", new JsonPrimitive(mid));
+ return nack.toString();
+ });
+
+ try {
+ consumeClient.start();
+ final ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
+ final Future<Session> consumerFuture = consumeClient.connect(consumeSocket, URI.create(consumerUri),
+ consumeRequest);
+ assertTrue(consumerFuture.get().isOpen());
- //assertEquals(consumeSocket2.getReceivedMessagesCount(), 1);
- assertTrue(consumeSocket1.getReceivedMessagesCount() > 0);
+ produceClient.start();
+ final ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+ final Future<Session> producerFuture = produceClient.connect(produceSocket, URI.create(producerUri),
+ produceRequest);
+ assertTrue(producerFuture.get().isOpen());
+ assertEquals(consumeSocket.getReceivedMessagesCount(), 0);
+
+ produceSocket.sendMessage(1);
+
+ Awaitility.await().atMost(delayTime - 1000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> assertEquals(consumeSocket.getReceivedMessagesCount(), 1));
+
+ // Nacked message should be redelivered after 5 seconds
+ Thread.sleep(delayTime);
+
+ Awaitility.await().atMost(delayTime - 1000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> assertEquals(consumeSocket.getReceivedMessagesCount(), 2));
} finally {
- stopWebSocketClient(consumeClient1, consumeClient2, produceClient);
+ stopWebSocketClient(consumeClient, produceClient);
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleProducerSocket.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleProducerSocket.java
index 149e46b..0d15e56 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleProducerSocket.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleProducerSocket.java
@@ -46,10 +46,16 @@ public class SimpleProducerSocket {
private final CountDownLatch closeLatch;
private Session session;
private final ArrayList<String> producerBuffer;
+ private final int messagesToSendWhenConnected;
public SimpleProducerSocket() {
+ this(10);
+ }
+
+ public SimpleProducerSocket(int messagesToSendWhenConnected) {
this.closeLatch = new CountDownLatch(1);
- producerBuffer = new ArrayList<>();
+ this.producerBuffer = new ArrayList<>();
+ this.messagesToSendWhenConnected = messagesToSendWhenConnected;
}
private static String getTestJsonPayload(int index) throws JsonProcessingException {
@@ -74,7 +80,7 @@ public class SimpleProducerSocket {
public void onConnect(Session session) throws Exception {
log.info("Got connect: {}", session);
this.session = session;
- sendMessage(10);
+ sendMessage(this.messagesToSendWhenConnected);
}
public void sendMessage(int totalMsgs) throws Exception {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index c798739..744cd78 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -387,6 +387,11 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
builder.priorityLevel(Integer.parseInt(queryParams.get("priorityLevel")));
}
+ if (queryParams.containsKey("negativeAckRedeliveryDelay")) {
+ builder.negativeAckRedeliveryDelay(Integer.parseInt(queryParams.get("negativeAckRedeliveryDelay")),
+ TimeUnit.MILLISECONDS);
+ }
+
if (queryParams.containsKey("maxRedeliverCount") || queryParams.containsKey("deadLetterTopic")) {
DeadLetterPolicy.DeadLetterPolicyBuilder dlpBuilder = DeadLetterPolicy.builder();
if (queryParams.containsKey("maxRedeliverCount")) {
@@ -397,9 +402,6 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
if (queryParams.containsKey("deadLetterTopic")) {
dlpBuilder.deadLetterTopic(queryParams.get("deadLetterTopic"));
}
- if (queryParams.containsKey("negativeAckRedeliveryDelay")) {
- builder.negativeAckRedeliveryDelay(Integer.parseInt(queryParams.get("negativeAckRedeliveryDelay")), TimeUnit.MILLISECONDS);
- }
builder.deadLetterPolicy(dlpBuilder.build());
}
diff --git a/site2/docs/client-libraries-websocket.md b/site2/docs/client-libraries-websocket.md
index d887e45..4af8979 100644
--- a/site2/docs/client-libraries-websocket.md
+++ b/site2/docs/client-libraries-websocket.md
@@ -167,7 +167,7 @@ Key | Type | Required? | Explanation
`maxRedeliverCount` | int | no | Define a [maxRedeliverCount](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#deadLetterPolicy-org.apache.pulsar.client.api.DeadLetterPolicy-) for the consumer (default: 0). Activates [Dead Letter Topic](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic) feature.
`deadLetterTopic` | string | no | Define a [deadLetterTopic](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#deadLetterPolicy-org.apache.pulsar.client.api.DeadLetterPolicy-) for the consumer (default: {topic}-{subscription}-DLQ). Activates [Dead Letter Topic](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic) feature.
`pullMode` | boolean | no | Enable pull mode (default: false). See "Flow Control" below.
-`negativeAckRedeliveryDelay` | int | no | When a message is negatively acknowledged, it will be redelivered to the DLQ.
+`negativeAckRedeliveryDelay` | int | no | When a message is negatively acknowledged, the delay time before the message is redelivered (in milliseconds). The default value is 60000.
`token` | string | no | Authentication token, this is used for the browser javascript client
NB: these parameter (except `pullMode`) apply to the internal consumer of the WebSocket service.