You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/11/17 03:16:09 UTC

[GitHub] [pulsar] codelipenghui commented on a change in pull request #8249: Nack support in WS

codelipenghui commented on a change in pull request #8249:
URL: https://github.com/apache/pulsar/pull/8249#discussion_r524858103



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
##########
@@ -564,6 +566,76 @@ public void socketPullModeTest() throws Exception {
         }
     }
 
+    @Test(timeOut = 10000)
+    public void nackMessageTest() throws Exception {
+        final String subscription = "my-sub";
+        final String dlqTopic = "my-property/my-ns/my-topic10";
+        final String consumerTopic = "my-property/my-ns/my-topic9";
+
+        final String dlqUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() +
+          "/ws/v2/consumer/persistent/" +
+          dlqTopic + "/" + subscription +
+          "?subscriptionType=Shared";
+
+        final String consumerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() +
+          "/ws/v2/consumer/persistent/" +
+          consumerTopic + "/" + subscription +
+          "?deadLetterTopic=" + dlqTopic +
+          "&maxRedeliverCount=0&subscriptionType=Shared&ackTimeoutMillis=1000&negativeAckRedeliveryDelay=1000";
+
+        final String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() +
+          "/ws/v2/producer/persistent/" + consumerTopic;
+
+        WebSocketClient consumeClient1 = new WebSocketClient();
+        SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
+        WebSocketClient consumeClient2 = new WebSocketClient();
+        SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket();
+        WebSocketClient produceClient = new WebSocketClient();
+        SimpleProducerSocket produceSocket = new SimpleProducerSocket();
+
+        consumeSocket1.setMessageHandler((id, data) -> {
+            JsonObject nack = new JsonObject();
+            nack.add("messageId", new JsonPrimitive(id));
+            nack.add("type", new JsonPrimitive("negativeAcknowledge"));
+            return nack.toString();
+        });
+
+        try {
+            consumeClient1.start();
+            consumeClient2.start();
+            ClientUpgradeRequest consumeRequest1 = new ClientUpgradeRequest();
+            ClientUpgradeRequest consumeRequest2 = new ClientUpgradeRequest();
+            Future<Session> consumerFuture1 = consumeClient1.connect(consumeSocket1, URI.create(consumerUri), consumeRequest1);
+            Future<Session> consumerFuture2 = consumeClient2.connect(consumeSocket2, URI.create(dlqUri), consumeRequest2);
+
+            assertTrue(consumerFuture1.get().isOpen());
+            assertTrue(consumerFuture2.get().isOpen());
+
+            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+            produceClient.start();
+            Future<Session> producerFuture = produceClient.connect(produceSocket, URI.create(producerUri), produceRequest);
+            assertTrue(producerFuture.get().isOpen());
+
+            assertEquals(consumeSocket1.getReceivedMessagesCount(), 0);
+            assertEquals(consumeSocket2.getReceivedMessagesCount(), 0);
+
+            produceSocket.sendMessage(1);
+
+            Thread.sleep(500);
+
+            //assertEquals(consumeSocket1.getReceivedMessagesCount(), 1);

Review comment:
       I think you can refine the test as https://github.com/apache/pulsar/pull/8557 does. Sleep also introduces too much flaky tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org