You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/04 12:33:12 UTC

[pulsar] 03/15: Fixed flaky test MemoryLimitTest#testRejectMessages (#14220) (#14628)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 35116b7b69eb6158bebbe82523dfb65499603b8e
Author: wuxuanqicn <89...@users.noreply.github.com>
AuthorDate: Sat Mar 12 13:50:05 2022 +0800

    Fixed flaky test MemoryLimitTest#testRejectMessages (#14220) (#14628)
    
    Co-authored-by: xuanqi.wu <xu...@weimob.com>
    (cherry picked from commit 5f8db372ee3926f93eb109ab3b713038c3b523c8)
---
 .../apache/pulsar/client/api/MemoryLimitTest.java  | 86 ++++++++++++----------
 1 file changed, 47 insertions(+), 39 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MemoryLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MemoryLimitTest.java
index ec98e7d1dbe..431991e61aa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MemoryLimitTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MemoryLimitTest.java
@@ -18,20 +18,22 @@
  */
 package org.apache.pulsar.client.api;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.fail;
-
-import java.util.concurrent.CountDownLatch;
-
 import lombok.Cleanup;
-
 import org.apache.pulsar.client.api.PulsarClientException.MemoryBufferIsFullError;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.client.impl.PulsarTestClient;
+import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
 @Test(groups = "broker-api")
 public class MemoryLimitTest extends ProducerConsumerBase {
 
@@ -62,27 +64,30 @@ public class MemoryLimitTest extends ProducerConsumerBase {
             throws Exception {
         String topic = newTopicName();
 
-        @Cleanup
-        PulsarClientImpl client = (PulsarClientImpl) PulsarClient.builder()
+        ClientBuilder clientBuilder = PulsarClient.builder()
                 .serviceUrl(pulsar.getBrokerServiceUrl())
-                .memoryLimit(100, SizeUnit.KILO_BYTES)
-                .build();
+                .memoryLimit(100, SizeUnit.KILO_BYTES);
 
         @Cleanup
-        Producer<byte[]> producer = client.newProducer()
+        PulsarTestClient client = PulsarTestClient.create(clientBuilder);
+
+        @Cleanup
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) client.newProducer()
                 .topic(topic)
                 .blockIfQueueFull(false)
+                .sendTimeout(5, TimeUnit.SECONDS)
                 .create();
 
+        // make sure all message pending at pendingMessages queue
+        // connection with broker can not be established, so handleSendReceipt will not be invoked while sending message
+        client.dropOpSendMessages();
         final int n = 101;
-        CountDownLatch latch = new CountDownLatch(n);
-
         for (int i = 0; i < n; i++) {
-            producer.sendAsync(new byte[1024]).thenRun(() -> {
-                latch.countDown();
-            });
+            producer.sendAsync(new byte[1024]);
         }
-
+        Awaitility.await()
+                .atMost(Duration.ofSeconds(5))
+                .until(() -> producer.getPendingQueueSize() == n);
         assertEquals(client.getMemoryLimitController().currentUsage(), n * 1024);
 
         try {
@@ -92,8 +97,10 @@ public class MemoryLimitTest extends ProducerConsumerBase {
             // Expected
         }
 
-        latch.await();
-
+        client.allowReconnecting();
+        Awaitility.await()
+                .atMost(Duration.ofSeconds(30))
+                .until(() -> producer.getPendingQueueSize() == 0);
         assertEquals(client.getMemoryLimitController().currentUsage(), 0);
 
         // We should now be able to send again
@@ -105,41 +112,40 @@ public class MemoryLimitTest extends ProducerConsumerBase {
         String t1 = newTopicName();
         String t2 = newTopicName();
 
-        @Cleanup
-        PulsarClientImpl client = (PulsarClientImpl) PulsarClient.builder()
+        ClientBuilder clientBuilder = PulsarClient.builder()
                 .serviceUrl(pulsar.getBrokerServiceUrl())
-                .memoryLimit(100, SizeUnit.KILO_BYTES)
-                .build();
+                .memoryLimit(100, SizeUnit.KILO_BYTES);
+
+        @Cleanup
+        PulsarTestClient client = PulsarTestClient.create(clientBuilder);
 
         @Cleanup
-        Producer<byte[]> p1 = client.newProducer()
+        ProducerImpl<byte[]> p1 = (ProducerImpl<byte[]>) client.newProducer()
                 .topic(t1)
                 .blockIfQueueFull(false)
+                .sendTimeout(5, TimeUnit.SECONDS)
                 .create();
 
         @Cleanup
-        Producer<byte[]> p2 = client.newProducer()
+        ProducerImpl<byte[]> p2 = (ProducerImpl<byte[]>) client.newProducer()
                 .topic(t2)
                 .blockIfQueueFull(false)
+                .sendTimeout(5, TimeUnit.SECONDS)
                 .create();
 
+        client.dropOpSendMessages();
         final int n = 101;
-        CountDownLatch latch = new CountDownLatch(n);
-
         for (int i = 0; i < n / 2; i++) {
-            p1.sendAsync(new byte[1024]).thenRun(() -> {
-                latch.countDown();
-            });
-            p2.sendAsync(new byte[1024]).thenRun(() -> {
-                latch.countDown();
-            });
+            p1.sendAsync(new byte[1024]);
+            p2.sendAsync(new byte[1024]);
         }
 
         // Last message in order to reach the limit
-        p1.sendAsync(new byte[1024]).thenRun(() -> {
-            latch.countDown();
-        });
+        p1.sendAsync(new byte[1024]);
 
+        Awaitility.await()
+                .atMost(Duration.ofSeconds(5))
+                .until(() -> (p1.getPendingQueueSize() + p2.getPendingQueueSize()) == n);
         assertEquals(client.getMemoryLimitController().currentUsage(), n * 1024);
 
         try {
@@ -156,8 +162,10 @@ public class MemoryLimitTest extends ProducerConsumerBase {
             // Expected
         }
 
-        latch.await();
-
+        client.allowReconnecting();
+        Awaitility.await()
+                .atMost(Duration.ofSeconds(30))
+                .until(() -> (p1.getPendingQueueSize() + p2.getPendingQueueSize()) == 0);
         assertEquals(client.getMemoryLimitController().currentUsage(), 0);
 
         // We should now be able to send again