You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/05/01 03:19:00 UTC

[pulsar] branch master updated: [Flaky Tests] Make PersistentTopicE2ETest#testBrokerConnectionStats tests stable. (#15372)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 42a235172d7 [Flaky Tests] Make PersistentTopicE2ETest#testBrokerConnectionStats tests stable. (#15372)
42a235172d7 is described below

commit 42a235172d73ee7f19923569e427a47b21275742
Author: Yan Zhao <ho...@apache.org>
AuthorDate: Sun May 1 11:18:47 2022 +0800

    [Flaky Tests] Make PersistentTopicE2ETest#testBrokerConnectionStats tests stable. (#15372)
---
 .../broker/service/PulsarChannelInitializer.java   |   3 +-
 .../broker/service/PersistentTopicE2ETest.java     | 119 +++++++++++++++++----
 2 files changed, 101 insertions(+), 21 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index e75c518a50f..f4cd73109bf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -59,7 +59,8 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
     // This cache is used to maintain a list of active connections to iterate over them
     // We keep weak references to have the cache to be auto cleaned up when the connections
     // objects are GCed.
-    private final Cache<SocketAddress, ServerCnx> connections = Caffeine.newBuilder()
+    @VisibleForTesting
+    protected final Cache<SocketAddress, ServerCnx> connections = Caffeine.newBuilder()
             .weakKeys()
             .weakValues()
             .build();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index d1a495b1529..adaa8c2fd19 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -26,6 +26,8 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.SocketChannel;
 import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -40,6 +42,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import lombok.Cleanup;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
@@ -48,6 +51,8 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.persistent.MessageRedeliveryController;
 import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -75,6 +80,7 @@ import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.common.api.proto.CommandConnect;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
@@ -94,13 +100,33 @@ import org.testng.annotations.Test;
 public class PersistentTopicE2ETest extends BrokerTestBase {
     private final List<AutoCloseable> closeables = new ArrayList<>();
 
+    private AtomicInteger activeCount;
+
+    private AtomicInteger inactiveCount;
+
     @BeforeMethod(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
+        this.activeCount = new AtomicInteger();
+        this.inactiveCount = new AtomicInteger();
         conf.setBrokerDeleteInactivePartitionedTopicMetadataEnabled(true);
         super.baseSetup();
     }
 
+    @Override
+    protected PulsarService newPulsarService(ServiceConfiguration conf) throws Exception {
+        return new PulsarService(conf) {
+            @Override
+            protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
+                BrokerService broker = new BrokerService(this, ioEventLoopGroup);
+                broker.setPulsarChannelInitializerFactory((_pulsar, opts) -> {
+                            return new PulsarChannelInitializerForTest(_pulsar, opts);
+                    });
+                return broker;
+            }
+        };
+    }
+
     @AfterMethod(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
@@ -113,6 +139,8 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
         }
         closeables.clear();
         super.internalCleanup();
+        this.activeCount = null;
+        this.inactiveCount = null;
     }
 
     @Test
@@ -1490,7 +1518,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
         assertTrue(msgInRate > 0);
     }
 
-    @Test(groups = "quarantine")
+    /**
+     * There is detailed info about this test.
+     * see: https://github.com/apache/pulsar/issues/10150#issuecomment-1112380074
+     */
+    @Test
     public void testBrokerConnectionStats() throws Exception {
 
         BrokerService brokerService = this.pulsar.getBrokerService();
@@ -1501,6 +1533,8 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
                 .create();
         Map<String, Object> map = null;
 
+        Awaitility.await().until(() -> this.activeCount.get() == 1);
+
         brokerService.updateRates();
         List<Metrics> metrics = brokerService.getTopicMetrics();
         for (int i = 0; i < metrics.size(); i++) {
@@ -1519,27 +1553,23 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
         producer.close();
         pulsarClient.close();
 
-        Awaitility.await().until(() -> {
-            brokerService.updateRates();
-            List<Metrics> closeMetrics = brokerService.getTopicMetrics();
-            Map<String, Object> closeMap = null;
-            for (int i = 0; i < closeMetrics.size(); i++) {
-                if (closeMetrics.get(i).getDimensions().containsValue("broker_connection")) {
-                    closeMap = closeMetrics.get(i).getMetrics();
-                    break;
-                }
-            }
+        Awaitility.await().until(() -> this.inactiveCount.get() == 1);
 
-            if (closeMap != null && (long) closeMap.get("brk_connection_created_total_count") == 1
-                    && (long) closeMap.get("brk_active_connections") == 0
-                    && (long) closeMap.get("brk_connection_closed_total_count") == 1
-                    && (long) closeMap.get("brk_connection_create_fail_count") == 0
-                    && (long) closeMap.get("brk_connection_create_success_count") == 1) {
-                return true;
-            } else {
-                return false;
+        map = null;
+        brokerService.updateRates();
+        metrics = brokerService.getTopicMetrics();
+        for (int i = 0; i < metrics.size(); i++) {
+            if (metrics.get(i).getDimensions().containsValue("broker_connection")) {
+                map = metrics.get(i).getMetrics();
+                break;
             }
-        });
+        }
+        assertNotNull(map);
+        assertEquals((long) map.get("brk_connection_created_total_count"), 1);
+        assertEquals((long) map.get("brk_active_connections"), 0);
+        assertEquals((long) map.get("brk_connection_closed_total_count"), 1);
+        assertEquals((long) map.get("brk_connection_create_success_count"), 1);
+        assertEquals((long) map.get("brk_connection_create_fail_count"), 0);
 
         pulsar.getConfiguration().setAuthenticationEnabled(true);
         if (pulsarClient != null) {
@@ -1557,6 +1587,9 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
             assertTrue(e instanceof PulsarClientException.AuthenticationException);
         }
 
+        Awaitility.await().until(() -> this.inactiveCount.get() == 2);
+
+        map = null;
         brokerService.updateRates();
         metrics = brokerService.getTopicMetrics();
         for (int i = 0; i < metrics.size(); i++) {
@@ -1893,4 +1926,50 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
             assertTrue(ex.getMessage().contains("Namespace not found"));
         }
     }
+
+    private class PulsarChannelInitializerForTest extends PulsarChannelInitializer {
+
+        private final PulsarService pulsar;
+
+        private final PulsarChannelOptions opts;
+
+        public PulsarChannelInitializerForTest(PulsarService pulsar, PulsarChannelOptions opts)
+                throws Exception {
+            super(pulsar, opts);
+            this.pulsar = pulsar;
+            this.opts = opts;
+        }
+
+        @Override
+        protected void initChannel(SocketChannel ch) throws Exception {
+            super.initChannel(ch);
+            //remove parent
+            ch.pipeline().remove("handler");
+            PersistentTopicE2ETest.ServerCnxForTest serverCnxForTest = new PersistentTopicE2ETest.ServerCnxForTest(this.pulsar, this.opts.getListenerName());
+            ch.pipeline().addAfter("flowController", "testHandler", serverCnxForTest);
+            //override parent
+            connections.put(ch.remoteAddress(), serverCnxForTest);
+        }
+
+    }
+
+    private class ServerCnxForTest extends ServerCnx {
+
+        public ServerCnxForTest(PulsarService pulsar, String listenerName) {
+            super(pulsar, listenerName);
+        }
+
+        @Override
+        protected void handleConnect(CommandConnect connect) {
+            //metrics `brk_connection_create_success_count` increase in handleConnect.
+            super.handleConnect(connect);
+            PersistentTopicE2ETest.this.activeCount.incrementAndGet();
+        }
+
+        @Override
+        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+            super.channelInactive(ctx);
+            PersistentTopicE2ETest.this.inactiveCount.incrementAndGet();
+        }
+    }
 }