You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/07/25 06:05:33 UTC
[pulsar] branch master updated: [fix][flaky-test] BrokerInterceptorTest.testProducerCreation (#16742)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bc94643bc1a [fix][flaky-test] BrokerInterceptorTest.testProducerCreation (#16742)
bc94643bc1a is described below
commit bc94643bc1a7f365dfb75e389ac3e1156770a119
Author: Baodi Shi <wu...@icloud.com>
AuthorDate: Mon Jul 25 14:05:27 2022 +0800
[fix][flaky-test] BrokerInterceptorTest.testProducerCreation (#16742)
---
.../broker/intercept/CounterBrokerInterceptor.java | 103 +++++++++++----------
1 file changed, 52 insertions(+), 51 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
index c56d1682c5b..25953a71fbd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
@@ -48,32 +49,32 @@ import org.eclipse.jetty.server.Response;
@Slf4j
public class CounterBrokerInterceptor implements BrokerInterceptor {
- int beforeSendCount = 0;
- int count = 0;
- int connectionCreationCount = 0;
- int producerCount = 0;
- int consumerCount = 0;
- int messageCount = 0;
- int messageDispatchCount = 0;
- int messageAckCount = 0;
- int handleAckCount = 0;
- int txnCount = 0;
- int committedTxnCount = 0;
- int abortedTxnCount = 0;
+ private AtomicInteger beforeSendCount = new AtomicInteger();
+ private AtomicInteger count = new AtomicInteger();
+ private AtomicInteger connectionCreationCount = new AtomicInteger();
+ private AtomicInteger producerCount = new AtomicInteger();
+ private AtomicInteger consumerCount = new AtomicInteger();
+ private AtomicInteger messageCount = new AtomicInteger();
+ private AtomicInteger messageDispatchCount = new AtomicInteger();
+ private AtomicInteger messageAckCount = new AtomicInteger();
+ private AtomicInteger handleAckCount = new AtomicInteger();
+ private AtomicInteger txnCount = new AtomicInteger();
+ private AtomicInteger committedTxnCount = new AtomicInteger();
+ private AtomicInteger abortedTxnCount = new AtomicInteger();
public void reset() {
- beforeSendCount = 0;
- count = 0;
- connectionCreationCount = 0;
- producerCount = 0;
- consumerCount = 0;
- messageCount = 0;
- messageDispatchCount = 0;
- messageAckCount = 0;
- handleAckCount = 0;
- txnCount = 0;
- committedTxnCount = 0;
- abortedTxnCount = 0;
+ beforeSendCount.set(0);
+ count.set(0);
+ connectionCreationCount.set(0);
+ producerCount.set(0);
+ consumerCount.set(0);
+ messageCount.set(0);
+ messageDispatchCount.set(0);
+ messageAckCount.set(0);
+ handleAckCount.set(0);
+ txnCount.set(0);
+ committedTxnCount.set(0);
+ abortedTxnCount.set(0);
}
private List<ResponseEvent> responseList = new ArrayList<>();
@@ -90,7 +91,7 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
if (log.isDebugEnabled()) {
log.debug("Connection created {}", cnx);
}
- connectionCreationCount++;
+ connectionCreationCount.incrementAndGet();
}
@Override
@@ -100,7 +101,7 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
log.debug("Producer created with name={}, id={}",
producer.getProducerName(), producer.getProducerId());
}
- producerCount++;
+ producerCount.incrementAndGet();
}
@Override
@@ -111,7 +112,7 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
log.debug("Consumer created with name={}, id={}",
consumer.consumerName(), consumer.consumerId());
}
- consumerCount++;
+ consumerCount.incrementAndGet();
}
@Override
@@ -122,7 +123,7 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
log.debug("Message published topic={}, producer={}",
producer.getTopic().getName(), producer.getProducerName());
}
- messageCount++;
+ messageCount.incrementAndGet();
}
@Override
@@ -132,13 +133,13 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
log.debug("Message dispatched topic={}, consumer={}",
consumer.getSubscription().getTopic().getName(), consumer.consumerName());
}
- messageDispatchCount++;
+ messageDispatchCount.incrementAndGet();
}
@Override
public void messageAcked(ServerCnx cnx, Consumer consumer,
CommandAck ack) {
- messageAckCount++;
+ messageAckCount.incrementAndGet();
}
@Override
@@ -150,7 +151,7 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
log.debug("Send message to topic {}, subscription {}",
subscription.getTopic(), subscription.getName());
}
- beforeSendCount++;
+ beforeSendCount.incrementAndGet();
}
@Override
@@ -159,9 +160,9 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
log.debug("[{}] On [{}] Pulsar command", count, command.getType().name());
}
if (command.getType().equals(BaseCommand.Type.ACK)) {
- handleAckCount++;
+ handleAckCount.incrementAndGet();
}
- count ++;
+ count.incrementAndGet();
}
@Override
@@ -171,7 +172,7 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
@Override
public void onWebserviceRequest(ServletRequest request) {
- count ++;
+ count.incrementAndGet();
if (log.isDebugEnabled()) {
log.debug("[{}] On [{}] Webservice request", count, ((HttpServletRequest) request).getRequestURL().toString());
}
@@ -179,7 +180,7 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
@Override
public void onWebserviceResponse(ServletRequest request, ServletResponse response) {
- count ++;
+ count.incrementAndGet();
if (log.isDebugEnabled()) {
log.debug("[{}] On [{}] Webservice response {}", count, ((HttpServletRequest) request).getRequestURL().toString(), response);
}
@@ -192,21 +193,21 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
@Override
public void onFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
- count = 100;
+ count.set(100);
chain.doFilter(request, response);
}
@Override
public void txnOpened(long tcId, String txnID) {
- txnCount ++;
+ txnCount.incrementAndGet();
}
@Override
public void txnEnded(String txnID, long txnAction) {
if(txnAction == TxnAction.COMMIT_VALUE) {
- committedTxnCount ++;
+ committedTxnCount.incrementAndGet();
} else {
- abortedTxnCount ++;
+ abortedTxnCount.incrementAndGet();
}
}
@@ -221,39 +222,39 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
}
public int getHandleAckCount() {
- return handleAckCount;
+ return handleAckCount.get();
}
public int getCount() {
- return count;
+ return count.get();
}
public int getProducerCount() {
- return producerCount;
+ return producerCount.get();
}
public int getConsumerCount() {
- return consumerCount;
+ return consumerCount.get();
}
public int getMessagePublishCount() {
- return messageCount;
+ return messageCount.get();
}
public int getMessageDispatchCount() {
- return messageDispatchCount;
+ return messageDispatchCount.get();
}
public int getMessageAckCount() {
- return messageAckCount;
+ return messageAckCount.get();
}
public int getBeforeSendCount() {
- return beforeSendCount;
+ return beforeSendCount.get();
}
public int getConnectionCreationCount() {
- return connectionCreationCount;
+ return connectionCreationCount.get();
}
public void clearResponseList() {
@@ -265,14 +266,14 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
}
public int getTxnCount() {
- return txnCount;
+ return txnCount.get();
}
public int getCommittedTxnCount() {
- return committedTxnCount;
+ return committedTxnCount.get();
}
public int getAbortedTxnCount() {
- return abortedTxnCount;
+ return abortedTxnCount.get();
}
}