You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/03/12 20:26:41 UTC
(pulsar) branch master updated: [fix][test] Fix flaky RGUsageMTAggrWaitForAllMsgsTest (#22252)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 43f9d2abb9d [fix][test] Fix flaky RGUsageMTAggrWaitForAllMsgsTest (#22252)
43f9d2abb9d is described below
commit 43f9d2abb9d5cd788fe18da6af7ad6fbfb3bc07b
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue Mar 12 22:26:34 2024 +0200
[fix][test] Fix flaky RGUsageMTAggrWaitForAllMsgsTest (#22252)
---
.../RGUsageMTAggrWaitForAllMsgsTest.java | 30 +++++++++++-----------
1 file changed, 15 insertions(+), 15 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
index 9aec93cf1ff..392ec0d3ff4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
@@ -20,6 +20,10 @@ package org.apache.pulsar.broker.resourcegroup;
import com.google.common.collect.Sets;
import io.prometheus.client.Summary;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass;
@@ -45,11 +49,6 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
// The tests implement a set of producer/consumer operations on a set of topics.
// [A thread is started for each producer, and each consumer in the test.]
@@ -57,6 +56,7 @@ import java.util.concurrent.TimeUnit;
// After sending/receiving all the messages, traffic usage statistics, and Prometheus-metrics
// are verified on the RGs.
@Slf4j
+@Test(groups = "flaky")
public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase {
@BeforeClass
@Override
@@ -119,9 +119,9 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase {
private final int numMesgsToProduce;
private final String myProduceTopic;
- private int sentNumBytes = 0;
- private int sentNumMsgs = 0;
- private int numExceptions = 0;
+ private volatile int sentNumBytes = 0;
+ private volatile int sentNumMsgs = 0;
+ private volatile int numExceptions = 0;
ProduceMessages(int prodId, int nMesgs, String[] topics) {
producerId = prodId;
@@ -202,9 +202,9 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase {
private final int recvTimeoutMilliSecs = 1000;
private final int ackTimeoutMilliSecs = 1100; // has to be more than 1 second
- private int recvdNumBytes = 0;
- private int recvdNumMsgs = 0;
- private int numExceptions = 0;
+ private volatile int recvdNumBytes = 0;
+ private volatile int recvdNumMsgs = 0;
+ private volatile int numExceptions = 0;
private volatile boolean allMessagesReceived = false;
private volatile boolean consumerIsReady = false;
@@ -494,15 +494,15 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase {
while (numConsumersDone < NUM_CONSUMERS) {
for (int ix = 0; ix < NUM_CONSUMERS; ix++) {
if (!joinedConsumers[ix]) {
+ consThr[ix].thread.join();
+ joinedConsumers[ix] = true;
+ log.debug("Joined consumer={}", ix);
+
recvdBytes = consThr[ix].consumer.getNumBytesRecvd();
recvdMsgs = consThr[ix].consumer.getNumMessagesRecvd();
numConsumerExceptions += consThr[ix].consumer.getNumExceptions();
log.debug("Consumer={} received {} mesgs and {} bytes", ix, recvdMsgs, recvdBytes);
- consThr[ix].thread.join();
- joinedConsumers[ix] = true;
- log.debug("Joined consumer={}", ix);
-
recvdNumBytes += recvdBytes;
recvdNumMsgs += recvdMsgs;
numConsumersDone++;