You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/12/01 10:18:14 UTC

[pulsar] branch branch-2.9 updated: [fix][flaky-test] BrokerInterceptorTest.testProducerCreation (#16742)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new f8692eb0b58 [fix][flaky-test] BrokerInterceptorTest.testProducerCreation (#16742)
f8692eb0b58 is described below

commit f8692eb0b58e8c1ff0358e11a76c227cef2d04be
Author: Baodi Shi <wu...@icloud.com>
AuthorDate: Mon Jul 25 14:05:27 2022 +0800

    [fix][flaky-test] BrokerInterceptorTest.testProducerCreation (#16742)
    
    (cherry picked from commit bc94643bc1a7f365dfb75e389ac3e1156770a119)
---
 .../broker/intercept/CounterBrokerInterceptor.java | 109 ++++++++++++---------
 1 file changed, 65 insertions(+), 44 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
index 3b0044386cc..9f0339121cf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
@@ -22,6 +22,7 @@ import io.netty.buffer.ByteBuf;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.Map;
 import javax.servlet.FilterChain;
 import javax.servlet.ServletException;
@@ -41,23 +42,35 @@ import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.CommandAck;
-import org.apache.pulsar.common.stats.Rate;
 import org.eclipse.jetty.server.Response;
 
 
 @Slf4j
 public class CounterBrokerInterceptor implements BrokerInterceptor {
 
-    int beforeSendCount = 0;
-    int count = 0;
-    int connectionCreationCount = 0;
-    int producerCount = 0;
-    int consumerCount = 0;
-    int messageCount = 0;
-    int messageDispatchCount = 0;
-    int messageAckCount = 0;
+    private final AtomicInteger beforeSendCount = new AtomicInteger();
+    private final AtomicInteger count = new AtomicInteger();
+    private final AtomicInteger connectionCreationCount = new AtomicInteger();
+    private final AtomicInteger producerCount = new AtomicInteger();
+    private final AtomicInteger consumerCount = new AtomicInteger();
+    private final AtomicInteger messageCount = new AtomicInteger();
+    private final AtomicInteger messageDispatchCount = new AtomicInteger();
+    private final AtomicInteger messageAckCount = new AtomicInteger();
 
-    private List<ResponseEvent> responseList = new ArrayList<>();
+
+
+    public void reset() {
+        beforeSendCount.set(0);
+        count.set(0);
+        connectionCreationCount.set(0);
+        producerCount.set(0);
+        consumerCount.set(0);
+        messageCount.set(0);
+        messageDispatchCount.set(0);
+        messageAckCount.set(0);
+    }
+
+    private final List<ResponseEvent> responseList = new ArrayList<>();
 
     @Data
     @AllArgsConstructor
@@ -69,47 +82,47 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
     @Override
     public void onConnectionCreated(ServerCnx cnx){
         log.info("Connection created {}", cnx);
-        connectionCreationCount++;
+        connectionCreationCount.incrementAndGet();
     }
 
     @Override
     public void producerCreated(ServerCnx cnx, Producer producer,
                                 Map<String, String> metadata){
         log.info("Producer created with name={}, id={}",
-            producer.getProducerName(), producer.getProducerId());
-        producerCount++;
+                producer.getProducerName(), producer.getProducerId());
+        producerCount.incrementAndGet();
     }
 
     @Override
     public void consumerCreated(ServerCnx cnx,
-                                 Consumer consumer,
-                                 Map<String, String> metadata) {
+                                Consumer consumer,
+                                Map<String, String> metadata) {
         log.info("Consumer created with name={}, id={}",
-            consumer.consumerName(), consumer.consumerId());
-        consumerCount++;
+                consumer.consumerName(), consumer.consumerId());
+        consumerCount.incrementAndGet();
     }
 
     @Override
     public void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, long ledgerId,
-                                 long entryId,
-                                 Topic.PublishContext publishContext) {
+                                long entryId,
+                                Topic.PublishContext publishContext) {
         log.info("Message published topic={}, producer={}",
-            producer.getTopic().getName(), producer.getProducerName());
-        messageCount++;
+                producer.getTopic().getName(), producer.getProducerName());
+        messageCount.incrementAndGet();
     }
 
     @Override
     public void messageDispatched(ServerCnx cnx, Consumer consumer, long ledgerId,
-                                   long entryId, ByteBuf headersAndPayload) {
+                                  long entryId, ByteBuf headersAndPayload) {
         log.info("Message dispatched topic={}, consumer={}",
-            consumer.getSubscription().getTopic().getName(), consumer.consumerName());
-        messageDispatchCount++;
+                consumer.getSubscription().getTopic().getName(), consumer.consumerName());
+        messageDispatchCount.incrementAndGet();
     }
 
     @Override
     public void messageAcked(ServerCnx cnx, Consumer consumer,
-                              CommandAck ack) {
-        messageAckCount++;
+                             CommandAck ack) {
+        messageAckCount.incrementAndGet();
     }
 
     @Override
@@ -117,15 +130,19 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
                                   Entry entry,
                                   long[] ackSet,
                                   MessageMetadata msgMetadata) {
-        log.debug("Send message to topic {}, subscription {}",
-            subscription.getTopic(), subscription.getName());
-        beforeSendCount++;
+        if (log.isDebugEnabled()) {
+            log.debug("Send message to topic {}, subscription {}",
+                    subscription.getTopic(), subscription.getName());
+        }
+        beforeSendCount.incrementAndGet();
     }
 
     @Override
     public void onPulsarCommand(BaseCommand command, ServerCnx cnx) {
-        log.debug("[{}] On [{}] Pulsar command", count, command.getType().name());
-        count ++;
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] On [{}] Pulsar command", count, command.getType().name());
+        }
+        count.incrementAndGet();
     }
 
     @Override
@@ -135,14 +152,18 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
 
     @Override
     public void onWebserviceRequest(ServletRequest request) {
-        count ++;
-        log.debug("[{}] On [{}] Webservice request", count, ((HttpServletRequest)request).getRequestURL().toString());
+        count.incrementAndGet();
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] On [{}] Webservice request", count, ((HttpServletRequest) request).getRequestURL().toString());
+        }
     }
 
     @Override
     public void onWebserviceResponse(ServletRequest request, ServletResponse response) {
-        count ++;
-        log.debug("[{}] On [{}] Webservice response {}", count, ((HttpServletRequest)request).getRequestURL().toString(), response);
+        count.incrementAndGet();
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] On [{}] Webservice response {}", count, ((HttpServletRequest) request).getRequestURL().toString(), response);
+        }
         if (response instanceof Response) {
             Response res = (Response) response;
             responseList.add(new ResponseEvent(res.getHttpChannel().getRequest().getRequestURI(), res.getStatus()));
@@ -152,7 +173,7 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
     @Override
     public void onFilter(ServletRequest request, ServletResponse response, FilterChain chain)
             throws IOException, ServletException {
-        count = 100;
+        count.set(100);
         chain.doFilter(request, response);
     }
 
@@ -167,35 +188,35 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
     }
 
     public int getCount() {
-        return count;
+        return count.get();
     }
 
     public int getProducerCount() {
-        return producerCount;
+        return producerCount.get();
     }
 
     public int getConsumerCount() {
-        return consumerCount;
+        return consumerCount.get();
     }
 
     public int getMessagePublishCount() {
-        return messageCount;
+        return messageCount.get();
     }
 
     public int getMessageDispatchCount() {
-        return messageDispatchCount;
+        return messageDispatchCount.get();
     }
 
     public int getMessageAckCount() {
-        return messageAckCount;
+        return messageAckCount.get();
     }
 
     public int getBeforeSendCount() {
-        return beforeSendCount;
+        return beforeSendCount.get();
     }
 
     public int getConnectionCreationCount() {
-        return connectionCreationCount;
+        return connectionCreationCount.get();
     }
 
     public void clearResponseList() {