You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/04 12:33:12 UTC
[pulsar] 03/15: Fixed flaky test MemoryLimitTest#testRejectMessages (#14220) (#14628)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 35116b7b69eb6158bebbe82523dfb65499603b8e
Author: wuxuanqicn <89...@users.noreply.github.com>
AuthorDate: Sat Mar 12 13:50:05 2022 +0800
Fixed flaky test MemoryLimitTest#testRejectMessages (#14220) (#14628)
Co-authored-by: xuanqi.wu <xu...@weimob.com>
(cherry picked from commit 5f8db372ee3926f93eb109ab3b713038c3b523c8)
---
.../apache/pulsar/client/api/MemoryLimitTest.java | 86 ++++++++++++----------
1 file changed, 47 insertions(+), 39 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MemoryLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MemoryLimitTest.java
index ec98e7d1dbe..431991e61aa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MemoryLimitTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MemoryLimitTest.java
@@ -18,20 +18,22 @@
*/
package org.apache.pulsar.client.api;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.fail;
-
-import java.util.concurrent.CountDownLatch;
-
import lombok.Cleanup;
-
import org.apache.pulsar.client.api.PulsarClientException.MemoryBufferIsFullError;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.client.impl.PulsarTestClient;
+import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
@Test(groups = "broker-api")
public class MemoryLimitTest extends ProducerConsumerBase {
@@ -62,27 +64,30 @@ public class MemoryLimitTest extends ProducerConsumerBase {
throws Exception {
String topic = newTopicName();
- @Cleanup
- PulsarClientImpl client = (PulsarClientImpl) PulsarClient.builder()
+ ClientBuilder clientBuilder = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
- .memoryLimit(100, SizeUnit.KILO_BYTES)
- .build();
+ .memoryLimit(100, SizeUnit.KILO_BYTES);
@Cleanup
- Producer<byte[]> producer = client.newProducer()
+ PulsarTestClient client = PulsarTestClient.create(clientBuilder);
+
+ @Cleanup
+ ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) client.newProducer()
.topic(topic)
.blockIfQueueFull(false)
+ .sendTimeout(5, TimeUnit.SECONDS)
.create();
+ // make sure all message pending at pendingMessages queue
+ // connection with broker can not be established, so handleSendReceipt will not be invoked while sending message
+ client.dropOpSendMessages();
final int n = 101;
- CountDownLatch latch = new CountDownLatch(n);
-
for (int i = 0; i < n; i++) {
- producer.sendAsync(new byte[1024]).thenRun(() -> {
- latch.countDown();
- });
+ producer.sendAsync(new byte[1024]);
}
-
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(5))
+ .until(() -> producer.getPendingQueueSize() == n);
assertEquals(client.getMemoryLimitController().currentUsage(), n * 1024);
try {
@@ -92,8 +97,10 @@ public class MemoryLimitTest extends ProducerConsumerBase {
// Expected
}
- latch.await();
-
+ client.allowReconnecting();
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(30))
+ .until(() -> producer.getPendingQueueSize() == 0);
assertEquals(client.getMemoryLimitController().currentUsage(), 0);
// We should now be able to send again
@@ -105,41 +112,40 @@ public class MemoryLimitTest extends ProducerConsumerBase {
String t1 = newTopicName();
String t2 = newTopicName();
- @Cleanup
- PulsarClientImpl client = (PulsarClientImpl) PulsarClient.builder()
+ ClientBuilder clientBuilder = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
- .memoryLimit(100, SizeUnit.KILO_BYTES)
- .build();
+ .memoryLimit(100, SizeUnit.KILO_BYTES);
+
+ @Cleanup
+ PulsarTestClient client = PulsarTestClient.create(clientBuilder);
@Cleanup
- Producer<byte[]> p1 = client.newProducer()
+ ProducerImpl<byte[]> p1 = (ProducerImpl<byte[]>) client.newProducer()
.topic(t1)
.blockIfQueueFull(false)
+ .sendTimeout(5, TimeUnit.SECONDS)
.create();
@Cleanup
- Producer<byte[]> p2 = client.newProducer()
+ ProducerImpl<byte[]> p2 = (ProducerImpl<byte[]>) client.newProducer()
.topic(t2)
.blockIfQueueFull(false)
+ .sendTimeout(5, TimeUnit.SECONDS)
.create();
+ client.dropOpSendMessages();
final int n = 101;
- CountDownLatch latch = new CountDownLatch(n);
-
for (int i = 0; i < n / 2; i++) {
- p1.sendAsync(new byte[1024]).thenRun(() -> {
- latch.countDown();
- });
- p2.sendAsync(new byte[1024]).thenRun(() -> {
- latch.countDown();
- });
+ p1.sendAsync(new byte[1024]);
+ p2.sendAsync(new byte[1024]);
}
// Last message in order to reach the limit
- p1.sendAsync(new byte[1024]).thenRun(() -> {
- latch.countDown();
- });
+ p1.sendAsync(new byte[1024]);
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(5))
+ .until(() -> (p1.getPendingQueueSize() + p2.getPendingQueueSize()) == n);
assertEquals(client.getMemoryLimitController().currentUsage(), n * 1024);
try {
@@ -156,8 +162,10 @@ public class MemoryLimitTest extends ProducerConsumerBase {
// Expected
}
- latch.await();
-
+ client.allowReconnecting();
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(30))
+ .until(() -> (p1.getPendingQueueSize() + p2.getPendingQueueSize()) == 0);
assertEquals(client.getMemoryLimitController().currentUsage(), 0);
// We should now be able to send again