You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2021/08/04 05:11:15 UTC

[pulsar] branch master updated: [websocket] Query parameter "negativeAckRedeliveryDelay" should be effective even if DLQ is disabled (#11495)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f76d0d  [websocket] Query parameter "negativeAckRedeliveryDelay" should be effective even if DLQ is disabled (#11495)
1f76d0d is described below

commit 1f76d0de94eda94b91648f6235acde6ebd02d43a
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Wed Aug 4 14:10:38 2021 +0900

    [websocket] Query parameter "negativeAckRedeliveryDelay" should be effective even if DLQ is disabled (#11495)
    
    ### Motivation
    
    On the consumer endpoint of WebSocket API, we can specify the delay time before a message which is negatively acknowledged is redelivered using the query parameter `negativeAckRedeliveryDelay`.
    
    However, this parameter is currently ignored when DLQ is disabled. I think this is an implementation mistake. Users should be able to specify `negativeAckRedeliveryDelay` even if DLQ is disabled.
    https://github.com/apache/pulsar/blob/ee202d06548e3c73d70ad52374658ee3507ca809/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java#L389-L403
    
    Related PR: https://github.com/apache/pulsar/pull/8249
    
    ### Modifications
    
    Fixed `ConsumerHandler` of WebSocket to use the `negativeAckRedeliveryDelay` value specified by the client even if DLQ is disabled. In addition, fixed an inappropriate test code (`ProxyPublishConsumeTest#nackMessageTest()`).
---
 .../websocket/proxy/ProxyPublishConsumeTest.java   | 77 ++++++++++++++++++----
 .../websocket/proxy/SimpleProducerSocket.java      | 10 ++-
 .../apache/pulsar/websocket/ConsumerHandler.java   |  8 ++-
 site2/docs/client-libraries-websocket.md           |  2 +-
 4 files changed, 79 insertions(+), 18 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 2019a9e..941e410 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -38,6 +38,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
+import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -769,11 +770,11 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
         }
     }
 
-    @Test(timeOut = 10000)
+    @Test(timeOut = 20000)
     public void nackMessageTest() throws Exception {
         final String subscription = "my-sub";
-        final String dlqTopic = "my-property/my-ns/my-topic10";
-        final String consumerTopic = "my-property/my-ns/my-topic9";
+        final String dlqTopic = "my-property/my-ns/nack-msg-dlq-" + UUID.randomUUID();
+        final String consumerTopic = "my-property/my-ns/nack-msg-" + UUID.randomUUID();
 
         final String dlqUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() +
           "/ws/v2/consumer/persistent/" +
@@ -784,7 +785,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
           "/ws/v2/consumer/persistent/" +
           consumerTopic + "/" + subscription +
           "?deadLetterTopic=" + dlqTopic +
-          "&maxRedeliverCount=0&subscriptionType=Shared&ackTimeoutMillis=1000&negativeAckRedeliveryDelay=1000";
+          "&maxRedeliverCount=1&subscriptionType=Shared&negativeAckRedeliveryDelay=1000";
 
         final String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() +
           "/ws/v2/producer/persistent/" + consumerTopic;
@@ -794,7 +795,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
         WebSocketClient consumeClient2 = new WebSocketClient();
         SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket();
         WebSocketClient produceClient = new WebSocketClient();
-        SimpleProducerSocket produceSocket = new SimpleProducerSocket();
+        SimpleProducerSocket produceSocket = new SimpleProducerSocket(0);
 
         consumeSocket1.setMessageHandler((id, data) -> {
             JsonObject nack = new JsonObject();
@@ -824,18 +825,70 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
 
             produceSocket.sendMessage(1);
 
-            Thread.sleep(500);
+            // Main topic
+            Awaitility.await().atMost(5, TimeUnit.SECONDS)
+                    .untilAsserted(() -> assertEquals(consumeSocket1.getReceivedMessagesCount(), 2));
 
-            //assertEquals(consumeSocket1.getReceivedMessagesCount(), 1);
-            assertTrue(consumeSocket1.getReceivedMessagesCount() > 0);
+            // DLQ
+            Awaitility.await().atMost(5, TimeUnit.SECONDS)
+                    .untilAsserted(() -> assertEquals(consumeSocket2.getReceivedMessagesCount(), 1));
+        } finally {
+            stopWebSocketClient(consumeClient1, consumeClient2, produceClient);
+        }
+    }
 
-            Thread.sleep(500);
+    @Test(timeOut = 20000)
+    public void nackRedeliveryDelayTest() throws Exception {
+        final String uriBase = "ws://localhost:" + proxyServer.getListenPortHTTP().get() + "/ws/v2";
+        final String topic = "my-property/my-ns/nack-redelivery-delay-" + UUID.randomUUID();
+        final String sub = "my-sub";
+        final int delayTime = 5000;
+
+        final String consumerUri = String.format("%s/consumer/persistent/%s/%s?negativeAckRedeliveryDelay=%d", uriBase,
+                topic, sub, delayTime);
+
+        final String producerUri = String.format("%s/producer/persistent/%s", uriBase, topic);
+
+        final WebSocketClient consumeClient = new WebSocketClient();
+        final SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
+
+        final WebSocketClient produceClient = new WebSocketClient();
+        final SimpleProducerSocket produceSocket = new SimpleProducerSocket(0);
+
+        consumeSocket.setMessageHandler((mid, data) -> {
+            JsonObject nack = new JsonObject();
+            nack.add("type", new JsonPrimitive("negativeAcknowledge"));
+            nack.add("messageId", new JsonPrimitive(mid));
+            return nack.toString();
+        });
+
+        try {
+            consumeClient.start();
+            final ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
+            final Future<Session> consumerFuture = consumeClient.connect(consumeSocket, URI.create(consumerUri),
+                    consumeRequest);
+            assertTrue(consumerFuture.get().isOpen());
 
-            //assertEquals(consumeSocket2.getReceivedMessagesCount(), 1);
-            assertTrue(consumeSocket1.getReceivedMessagesCount() > 0);
+            produceClient.start();
+            final ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+            final Future<Session> producerFuture = produceClient.connect(produceSocket, URI.create(producerUri),
+                    produceRequest);
+            assertTrue(producerFuture.get().isOpen());
 
+            assertEquals(consumeSocket.getReceivedMessagesCount(), 0);
+
+            produceSocket.sendMessage(1);
+
+            Awaitility.await().atMost(delayTime - 1000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(() -> assertEquals(consumeSocket.getReceivedMessagesCount(), 1));
+
+            // Nacked message should be redelivered after 5 seconds
+            Thread.sleep(delayTime);
+
+            Awaitility.await().atMost(delayTime - 1000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(() -> assertEquals(consumeSocket.getReceivedMessagesCount(), 2));
         } finally {
-            stopWebSocketClient(consumeClient1, consumeClient2, produceClient);
+            stopWebSocketClient(consumeClient, produceClient);
         }
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleProducerSocket.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleProducerSocket.java
index 149e46b..0d15e56 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleProducerSocket.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleProducerSocket.java
@@ -46,10 +46,16 @@ public class SimpleProducerSocket {
     private final CountDownLatch closeLatch;
     private Session session;
     private final ArrayList<String> producerBuffer;
+    private final int messagesToSendWhenConnected;
 
     public SimpleProducerSocket() {
+        this(10);
+    }
+
+    public SimpleProducerSocket(int messagesToSendWhenConnected) {
         this.closeLatch = new CountDownLatch(1);
-        producerBuffer = new ArrayList<>();
+        this.producerBuffer = new ArrayList<>();
+        this.messagesToSendWhenConnected = messagesToSendWhenConnected;
     }
 
     private static String getTestJsonPayload(int index) throws JsonProcessingException {
@@ -74,7 +80,7 @@ public class SimpleProducerSocket {
     public void onConnect(Session session) throws Exception {
         log.info("Got connect: {}", session);
         this.session = session;
-        sendMessage(10);
+        sendMessage(this.messagesToSendWhenConnected);
     }
 
     public void sendMessage(int totalMsgs) throws Exception {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index 525ea0f..a1c76d2 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -386,6 +386,11 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
             builder.priorityLevel(Integer.parseInt(queryParams.get("priorityLevel")));
         }
 
+        if (queryParams.containsKey("negativeAckRedeliveryDelay")) {
+            builder.negativeAckRedeliveryDelay(Integer.parseInt(queryParams.get("negativeAckRedeliveryDelay")),
+                    TimeUnit.MILLISECONDS);
+        }
+
         if (queryParams.containsKey("maxRedeliverCount") || queryParams.containsKey("deadLetterTopic")) {
             DeadLetterPolicy.DeadLetterPolicyBuilder dlpBuilder = DeadLetterPolicy.builder();
             if (queryParams.containsKey("maxRedeliverCount")) {
@@ -396,9 +401,6 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
             if (queryParams.containsKey("deadLetterTopic")) {
                 dlpBuilder.deadLetterTopic(queryParams.get("deadLetterTopic"));
             }
-            if (queryParams.containsKey("negativeAckRedeliveryDelay")) {
-                builder.negativeAckRedeliveryDelay(Integer.parseInt(queryParams.get("negativeAckRedeliveryDelay")), TimeUnit.MILLISECONDS);
-            }
             builder.deadLetterPolicy(dlpBuilder.build());
         }
 
diff --git a/site2/docs/client-libraries-websocket.md b/site2/docs/client-libraries-websocket.md
index d887e45..4af8979 100644
--- a/site2/docs/client-libraries-websocket.md
+++ b/site2/docs/client-libraries-websocket.md
@@ -167,7 +167,7 @@ Key | Type | Required? | Explanation
 `maxRedeliverCount` | int | no | Define a [maxRedeliverCount](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#deadLetterPolicy-org.apache.pulsar.client.api.DeadLetterPolicy-) for the consumer (default: 0). Activates [Dead Letter Topic](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic) feature.
 `deadLetterTopic` | string | no | Define a [deadLetterTopic](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#deadLetterPolicy-org.apache.pulsar.client.api.DeadLetterPolicy-) for the consumer (default: {topic}-{subscription}-DLQ). Activates [Dead Letter Topic](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic) feature.
 `pullMode` | boolean | no | Enable pull mode (default: false). See "Flow Control" below.
-`negativeAckRedeliveryDelay` | int | no | When a message is negatively acknowledged, it will be redelivered to the DLQ.
+`negativeAckRedeliveryDelay` | int | no | When a message is negatively acknowledged, the delay time before the message is redelivered (in milliseconds). The default value is 60000.
 `token` | string | no | Authentication token, this is used for the browser javascript client
 
 NB: these parameter (except `pullMode`) apply to the internal consumer of the WebSocket service.