You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/07/25 06:05:33 UTC

[pulsar] branch master updated: [fix][flaky-test] BrokerInterceptorTest.testProducerCreation (#16742)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bc94643bc1a [fix][flaky-test] BrokerInterceptorTest.testProducerCreation (#16742)
bc94643bc1a is described below

commit bc94643bc1a7f365dfb75e389ac3e1156770a119
Author: Baodi Shi <wu...@icloud.com>
AuthorDate: Mon Jul 25 14:05:27 2022 +0800

    [fix][flaky-test] BrokerInterceptorTest.testProducerCreation (#16742)
---
 .../broker/intercept/CounterBrokerInterceptor.java | 103 +++++++++++----------
 1 file changed, 52 insertions(+), 51 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
index c56d1682c5b..25953a71fbd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.servlet.FilterChain;
 import javax.servlet.ServletException;
 import javax.servlet.ServletRequest;
@@ -48,32 +49,32 @@ import org.eclipse.jetty.server.Response;
 @Slf4j
 public class CounterBrokerInterceptor implements BrokerInterceptor {
 
-    int beforeSendCount = 0;
-    int count = 0;
-    int connectionCreationCount = 0;
-    int producerCount = 0;
-    int consumerCount = 0;
-    int messageCount = 0;
-    int messageDispatchCount = 0;
-    int messageAckCount = 0;
-    int handleAckCount = 0;
-    int txnCount = 0;
-    int committedTxnCount = 0;
-    int abortedTxnCount = 0;
+    private AtomicInteger beforeSendCount = new AtomicInteger();
+    private AtomicInteger count = new AtomicInteger();
+    private AtomicInteger connectionCreationCount = new AtomicInteger();
+    private AtomicInteger producerCount = new AtomicInteger();
+    private AtomicInteger consumerCount = new AtomicInteger();
+    private AtomicInteger messageCount = new AtomicInteger();
+    private AtomicInteger messageDispatchCount = new AtomicInteger();
+    private AtomicInteger messageAckCount = new AtomicInteger();
+    private AtomicInteger handleAckCount = new AtomicInteger();
+    private AtomicInteger txnCount = new AtomicInteger();
+    private AtomicInteger committedTxnCount = new AtomicInteger();
+    private AtomicInteger abortedTxnCount = new AtomicInteger();
 
     public void reset() {
-        beforeSendCount = 0;
-        count = 0;
-        connectionCreationCount = 0;
-        producerCount = 0;
-        consumerCount = 0;
-        messageCount = 0;
-        messageDispatchCount = 0;
-        messageAckCount = 0;
-        handleAckCount = 0;
-        txnCount = 0;
-        committedTxnCount = 0;
-        abortedTxnCount = 0;
+        beforeSendCount.set(0);
+        count.set(0);
+        connectionCreationCount.set(0);
+        producerCount.set(0);
+        consumerCount.set(0);
+        messageCount.set(0);
+        messageDispatchCount.set(0);
+        messageAckCount.set(0);
+        handleAckCount.set(0);
+        txnCount.set(0);
+        committedTxnCount.set(0);
+        abortedTxnCount.set(0);
     }
 
     private List<ResponseEvent> responseList = new ArrayList<>();
@@ -90,7 +91,7 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
         if (log.isDebugEnabled()) {
             log.debug("Connection created {}", cnx);
         }
-        connectionCreationCount++;
+        connectionCreationCount.incrementAndGet();
     }
 
     @Override
@@ -100,7 +101,7 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
             log.debug("Producer created with name={}, id={}",
                     producer.getProducerName(), producer.getProducerId());
         }
-        producerCount++;
+        producerCount.incrementAndGet();
     }
 
     @Override
@@ -111,7 +112,7 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
             log.debug("Consumer created with name={}, id={}",
                     consumer.consumerName(), consumer.consumerId());
         }
-        consumerCount++;
+        consumerCount.incrementAndGet();
     }
 
     @Override
@@ -122,7 +123,7 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
             log.debug("Message published topic={}, producer={}",
                     producer.getTopic().getName(), producer.getProducerName());
         }
-        messageCount++;
+        messageCount.incrementAndGet();
     }
 
     @Override
@@ -132,13 +133,13 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
             log.debug("Message dispatched topic={}, consumer={}",
                     consumer.getSubscription().getTopic().getName(), consumer.consumerName());
         }
-        messageDispatchCount++;
+        messageDispatchCount.incrementAndGet();
     }
 
     @Override
     public void messageAcked(ServerCnx cnx, Consumer consumer,
                               CommandAck ack) {
-        messageAckCount++;
+        messageAckCount.incrementAndGet();
     }
 
     @Override
@@ -150,7 +151,7 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
             log.debug("Send message to topic {}, subscription {}",
                     subscription.getTopic(), subscription.getName());
         }
-        beforeSendCount++;
+        beforeSendCount.incrementAndGet();
     }
 
     @Override
@@ -159,9 +160,9 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
             log.debug("[{}] On [{}] Pulsar command", count, command.getType().name());
         }
         if (command.getType().equals(BaseCommand.Type.ACK)) {
-            handleAckCount++;
+            handleAckCount.incrementAndGet();
         }
-        count ++;
+        count.incrementAndGet();
     }
 
     @Override
@@ -171,7 +172,7 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
 
     @Override
     public void onWebserviceRequest(ServletRequest request) {
-        count ++;
+        count.incrementAndGet();
         if (log.isDebugEnabled()) {
             log.debug("[{}] On [{}] Webservice request", count, ((HttpServletRequest) request).getRequestURL().toString());
         }
@@ -179,7 +180,7 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
 
     @Override
     public void onWebserviceResponse(ServletRequest request, ServletResponse response) {
-        count ++;
+        count.incrementAndGet();
         if (log.isDebugEnabled()) {
             log.debug("[{}] On [{}] Webservice response {}", count, ((HttpServletRequest) request).getRequestURL().toString(), response);
         }
@@ -192,21 +193,21 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
     @Override
     public void onFilter(ServletRequest request, ServletResponse response, FilterChain chain)
             throws IOException, ServletException {
-        count = 100;
+        count.set(100);
         chain.doFilter(request, response);
     }
 
     @Override
     public void txnOpened(long tcId, String txnID) {
-        txnCount ++;
+        txnCount.incrementAndGet();
     }
 
     @Override
     public void txnEnded(String txnID, long txnAction) {
         if(txnAction == TxnAction.COMMIT_VALUE) {
-            committedTxnCount ++;
+            committedTxnCount.incrementAndGet();
         } else {
-            abortedTxnCount ++;
+            abortedTxnCount.incrementAndGet();
         }
     }
 
@@ -221,39 +222,39 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
     }
 
     public int getHandleAckCount() {
-        return handleAckCount;
+        return handleAckCount.get();
     }
 
     public int getCount() {
-        return count;
+        return count.get();
     }
 
     public int getProducerCount() {
-        return producerCount;
+        return producerCount.get();
     }
 
     public int getConsumerCount() {
-        return consumerCount;
+        return consumerCount.get();
     }
 
     public int getMessagePublishCount() {
-        return messageCount;
+        return messageCount.get();
     }
 
     public int getMessageDispatchCount() {
-        return messageDispatchCount;
+        return messageDispatchCount.get();
     }
 
     public int getMessageAckCount() {
-        return messageAckCount;
+        return messageAckCount.get();
     }
 
     public int getBeforeSendCount() {
-        return beforeSendCount;
+        return beforeSendCount.get();
     }
 
     public int getConnectionCreationCount() {
-        return connectionCreationCount;
+        return connectionCreationCount.get();
     }
 
     public void clearResponseList() {
@@ -265,14 +266,14 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
     }
 
     public int getTxnCount() {
-        return txnCount;
+        return txnCount.get();
     }
 
     public int getCommittedTxnCount() {
-        return committedTxnCount;
+        return committedTxnCount.get();
     }
 
     public int getAbortedTxnCount() {
-        return abortedTxnCount;
+        return abortedTxnCount.get();
     }
 }