You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/12/01 10:18:14 UTC
[pulsar] branch branch-2.9 updated: [fix][flaky-test] BrokerInterceptorTest.testProducerCreation (#16742)
This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new f8692eb0b58 [fix][flaky-test] BrokerInterceptorTest.testProducerCreation (#16742)
f8692eb0b58 is described below
commit f8692eb0b58e8c1ff0358e11a76c227cef2d04be
Author: Baodi Shi <wu...@icloud.com>
AuthorDate: Mon Jul 25 14:05:27 2022 +0800
[fix][flaky-test] BrokerInterceptorTest.testProducerCreation (#16742)
(cherry picked from commit bc94643bc1a7f365dfb75e389ac3e1156770a119)
---
.../broker/intercept/CounterBrokerInterceptor.java | 109 ++++++++++++---------
1 file changed, 65 insertions(+), 44 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
index 3b0044386cc..9f0339121cf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
@@ -22,6 +22,7 @@ import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.Map;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
@@ -41,23 +42,35 @@ import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.CommandAck;
-import org.apache.pulsar.common.stats.Rate;
import org.eclipse.jetty.server.Response;
@Slf4j
public class CounterBrokerInterceptor implements BrokerInterceptor {
- int beforeSendCount = 0;
- int count = 0;
- int connectionCreationCount = 0;
- int producerCount = 0;
- int consumerCount = 0;
- int messageCount = 0;
- int messageDispatchCount = 0;
- int messageAckCount = 0;
+ private final AtomicInteger beforeSendCount = new AtomicInteger();
+ private final AtomicInteger count = new AtomicInteger();
+ private final AtomicInteger connectionCreationCount = new AtomicInteger();
+ private final AtomicInteger producerCount = new AtomicInteger();
+ private final AtomicInteger consumerCount = new AtomicInteger();
+ private final AtomicInteger messageCount = new AtomicInteger();
+ private final AtomicInteger messageDispatchCount = new AtomicInteger();
+ private final AtomicInteger messageAckCount = new AtomicInteger();
- private List<ResponseEvent> responseList = new ArrayList<>();
+
+
+ public void reset() {
+ beforeSendCount.set(0);
+ count.set(0);
+ connectionCreationCount.set(0);
+ producerCount.set(0);
+ consumerCount.set(0);
+ messageCount.set(0);
+ messageDispatchCount.set(0);
+ messageAckCount.set(0);
+ }
+
+ private final List<ResponseEvent> responseList = new ArrayList<>();
@Data
@AllArgsConstructor
@@ -69,47 +82,47 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
@Override
public void onConnectionCreated(ServerCnx cnx){
log.info("Connection created {}", cnx);
- connectionCreationCount++;
+ connectionCreationCount.incrementAndGet();
}
@Override
public void producerCreated(ServerCnx cnx, Producer producer,
Map<String, String> metadata){
log.info("Producer created with name={}, id={}",
- producer.getProducerName(), producer.getProducerId());
- producerCount++;
+ producer.getProducerName(), producer.getProducerId());
+ producerCount.incrementAndGet();
}
@Override
public void consumerCreated(ServerCnx cnx,
- Consumer consumer,
- Map<String, String> metadata) {
+ Consumer consumer,
+ Map<String, String> metadata) {
log.info("Consumer created with name={}, id={}",
- consumer.consumerName(), consumer.consumerId());
- consumerCount++;
+ consumer.consumerName(), consumer.consumerId());
+ consumerCount.incrementAndGet();
}
@Override
public void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, long ledgerId,
- long entryId,
- Topic.PublishContext publishContext) {
+ long entryId,
+ Topic.PublishContext publishContext) {
log.info("Message published topic={}, producer={}",
- producer.getTopic().getName(), producer.getProducerName());
- messageCount++;
+ producer.getTopic().getName(), producer.getProducerName());
+ messageCount.incrementAndGet();
}
@Override
public void messageDispatched(ServerCnx cnx, Consumer consumer, long ledgerId,
- long entryId, ByteBuf headersAndPayload) {
+ long entryId, ByteBuf headersAndPayload) {
log.info("Message dispatched topic={}, consumer={}",
- consumer.getSubscription().getTopic().getName(), consumer.consumerName());
- messageDispatchCount++;
+ consumer.getSubscription().getTopic().getName(), consumer.consumerName());
+ messageDispatchCount.incrementAndGet();
}
@Override
public void messageAcked(ServerCnx cnx, Consumer consumer,
- CommandAck ack) {
- messageAckCount++;
+ CommandAck ack) {
+ messageAckCount.incrementAndGet();
}
@Override
@@ -117,15 +130,19 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
Entry entry,
long[] ackSet,
MessageMetadata msgMetadata) {
- log.debug("Send message to topic {}, subscription {}",
- subscription.getTopic(), subscription.getName());
- beforeSendCount++;
+ if (log.isDebugEnabled()) {
+ log.debug("Send message to topic {}, subscription {}",
+ subscription.getTopic(), subscription.getName());
+ }
+ beforeSendCount.incrementAndGet();
}
@Override
public void onPulsarCommand(BaseCommand command, ServerCnx cnx) {
- log.debug("[{}] On [{}] Pulsar command", count, command.getType().name());
- count ++;
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] On [{}] Pulsar command", count, command.getType().name());
+ }
+ count.incrementAndGet();
}
@Override
@@ -135,14 +152,18 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
@Override
public void onWebserviceRequest(ServletRequest request) {
- count ++;
- log.debug("[{}] On [{}] Webservice request", count, ((HttpServletRequest)request).getRequestURL().toString());
+ count.incrementAndGet();
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] On [{}] Webservice request", count, ((HttpServletRequest) request).getRequestURL().toString());
+ }
}
@Override
public void onWebserviceResponse(ServletRequest request, ServletResponse response) {
- count ++;
- log.debug("[{}] On [{}] Webservice response {}", count, ((HttpServletRequest)request).getRequestURL().toString(), response);
+ count.incrementAndGet();
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] On [{}] Webservice response {}", count, ((HttpServletRequest) request).getRequestURL().toString(), response);
+ }
if (response instanceof Response) {
Response res = (Response) response;
responseList.add(new ResponseEvent(res.getHttpChannel().getRequest().getRequestURI(), res.getStatus()));
@@ -152,7 +173,7 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
@Override
public void onFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
- count = 100;
+ count.set(100);
chain.doFilter(request, response);
}
@@ -167,35 +188,35 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
}
public int getCount() {
- return count;
+ return count.get();
}
public int getProducerCount() {
- return producerCount;
+ return producerCount.get();
}
public int getConsumerCount() {
- return consumerCount;
+ return consumerCount.get();
}
public int getMessagePublishCount() {
- return messageCount;
+ return messageCount.get();
}
public int getMessageDispatchCount() {
- return messageDispatchCount;
+ return messageDispatchCount.get();
}
public int getMessageAckCount() {
- return messageAckCount;
+ return messageAckCount.get();
}
public int getBeforeSendCount() {
- return beforeSendCount;
+ return beforeSendCount.get();
}
public int getConnectionCreationCount() {
- return connectionCreationCount;
+ return connectionCreationCount.get();
}
public void clearResponseList() {