You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/03/12 20:26:41 UTC

(pulsar) branch master updated: [fix][test] Fix flaky RGUsageMTAggrWaitForAllMsgsTest (#22252)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 43f9d2abb9d [fix][test] Fix flaky RGUsageMTAggrWaitForAllMsgsTest (#22252)
43f9d2abb9d is described below

commit 43f9d2abb9d5cd788fe18da6af7ad6fbfb3bc07b
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue Mar 12 22:26:34 2024 +0200

    [fix][test] Fix flaky RGUsageMTAggrWaitForAllMsgsTest (#22252)
---
 .../RGUsageMTAggrWaitForAllMsgsTest.java           | 30 +++++++++++-----------
 1 file changed, 15 insertions(+), 15 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
index 9aec93cf1ff..392ec0d3ff4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
@@ -20,6 +20,10 @@ package org.apache.pulsar.broker.resourcegroup;
 
 import com.google.common.collect.Sets;
 import io.prometheus.client.Summary;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
 import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass;
@@ -45,11 +49,6 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
 
 // The tests implement a set of producer/consumer operations on a set of topics.
 // [A thread is started for each producer, and each consumer in the test.]
@@ -57,6 +56,7 @@ import java.util.concurrent.TimeUnit;
 // After sending/receiving all the messages, traffic usage statistics, and Prometheus-metrics
 // are verified on the RGs.
 @Slf4j
+@Test(groups = "flaky")
 public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase {
     @BeforeClass
     @Override
@@ -119,9 +119,9 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase {
         private final int numMesgsToProduce;
         private final String myProduceTopic;
 
-        private int sentNumBytes = 0;
-        private int sentNumMsgs = 0;
-        private int numExceptions = 0;
+        private volatile int sentNumBytes = 0;
+        private volatile int sentNumMsgs = 0;
+        private volatile int numExceptions = 0;
 
         ProduceMessages(int prodId, int nMesgs, String[] topics) {
             producerId = prodId;
@@ -202,9 +202,9 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase {
 
         private final int recvTimeoutMilliSecs = 1000;
         private final int ackTimeoutMilliSecs = 1100; // has to be more than 1 second
-        private int recvdNumBytes = 0;
-        private int recvdNumMsgs = 0;
-        private int numExceptions = 0;
+        private volatile int recvdNumBytes = 0;
+        private volatile int recvdNumMsgs = 0;
+        private volatile int numExceptions = 0;
         private volatile boolean allMessagesReceived = false;
         private volatile boolean consumerIsReady = false;
 
@@ -494,15 +494,15 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase {
         while (numConsumersDone < NUM_CONSUMERS) {
             for (int ix = 0; ix < NUM_CONSUMERS; ix++) {
                 if (!joinedConsumers[ix]) {
+                    consThr[ix].thread.join();
+                    joinedConsumers[ix] = true;
+                    log.debug("Joined consumer={}", ix);
+
                     recvdBytes = consThr[ix].consumer.getNumBytesRecvd();
                     recvdMsgs = consThr[ix].consumer.getNumMessagesRecvd();
                     numConsumerExceptions += consThr[ix].consumer.getNumExceptions();
                     log.debug("Consumer={} received {} mesgs and {} bytes", ix, recvdMsgs, recvdBytes);
 
-                    consThr[ix].thread.join();
-                    joinedConsumers[ix] = true;
-                    log.debug("Joined consumer={}", ix);
-
                     recvdNumBytes += recvdBytes;
                     recvdNumMsgs += recvdMsgs;
                     numConsumersDone++;