You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/05/01 03:19:00 UTC
[pulsar] branch master updated: [Flaky Tests] Make PersistentTopicE2ETest#testBrokerConnectionStats tests stable. (#15372)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 42a235172d7 [Flaky Tests] Make PersistentTopicE2ETest#testBrokerConnectionStats tests stable. (#15372)
42a235172d7 is described below
commit 42a235172d73ee7f19923569e427a47b21275742
Author: Yan Zhao <ho...@apache.org>
AuthorDate: Sun May 1 11:18:47 2022 +0800
[Flaky Tests] Make PersistentTopicE2ETest#testBrokerConnectionStats tests stable. (#15372)
---
.../broker/service/PulsarChannelInitializer.java | 3 +-
.../broker/service/PersistentTopicE2ETest.java | 119 +++++++++++++++++----
2 files changed, 101 insertions(+), 21 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index e75c518a50f..f4cd73109bf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -59,7 +59,8 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
// This cache is used to maintain a list of active connections to iterate over them
// We keep weak references to have the cache to be auto cleaned up when the connections
// objects are GCed.
- private final Cache<SocketAddress, ServerCnx> connections = Caffeine.newBuilder()
+ @VisibleForTesting
+ protected final Cache<SocketAddress, ServerCnx> connections = Caffeine.newBuilder()
.weakKeys()
.weakValues()
.build();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index d1a495b1529..adaa8c2fd19 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -26,6 +26,8 @@ import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.SocketChannel;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -40,6 +42,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import lombok.Data;
import lombok.EqualsAndHashCode;
@@ -48,6 +51,8 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.MessageRedeliveryController;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -75,6 +80,7 @@ import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.common.api.proto.CommandConnect;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
@@ -94,13 +100,33 @@ import org.testng.annotations.Test;
public class PersistentTopicE2ETest extends BrokerTestBase {
private final List<AutoCloseable> closeables = new ArrayList<>();
+ private AtomicInteger activeCount;
+
+ private AtomicInteger inactiveCount;
+
@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
+ this.activeCount = new AtomicInteger();
+ this.inactiveCount = new AtomicInteger();
conf.setBrokerDeleteInactivePartitionedTopicMetadataEnabled(true);
super.baseSetup();
}
+ @Override
+ protected PulsarService newPulsarService(ServiceConfiguration conf) throws Exception {
+ return new PulsarService(conf) {
+ @Override
+ protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
+ BrokerService broker = new BrokerService(this, ioEventLoopGroup);
+ broker.setPulsarChannelInitializerFactory((_pulsar, opts) -> {
+ return new PulsarChannelInitializerForTest(_pulsar, opts);
+ });
+ return broker;
+ }
+ };
+ }
+
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
@@ -113,6 +139,8 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
}
closeables.clear();
super.internalCleanup();
+ this.activeCount = null;
+ this.inactiveCount = null;
}
@Test
@@ -1490,7 +1518,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
assertTrue(msgInRate > 0);
}
- @Test(groups = "quarantine")
+ /**
+ * There is detailed info about this test.
+ * see: https://github.com/apache/pulsar/issues/10150#issuecomment-1112380074
+ */
+ @Test
public void testBrokerConnectionStats() throws Exception {
BrokerService brokerService = this.pulsar.getBrokerService();
@@ -1501,6 +1533,8 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
.create();
Map<String, Object> map = null;
+ Awaitility.await().until(() -> this.activeCount.get() == 1);
+
brokerService.updateRates();
List<Metrics> metrics = brokerService.getTopicMetrics();
for (int i = 0; i < metrics.size(); i++) {
@@ -1519,27 +1553,23 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
producer.close();
pulsarClient.close();
- Awaitility.await().until(() -> {
- brokerService.updateRates();
- List<Metrics> closeMetrics = brokerService.getTopicMetrics();
- Map<String, Object> closeMap = null;
- for (int i = 0; i < closeMetrics.size(); i++) {
- if (closeMetrics.get(i).getDimensions().containsValue("broker_connection")) {
- closeMap = closeMetrics.get(i).getMetrics();
- break;
- }
- }
+ Awaitility.await().until(() -> this.inactiveCount.get() == 1);
- if (closeMap != null && (long) closeMap.get("brk_connection_created_total_count") == 1
- && (long) closeMap.get("brk_active_connections") == 0
- && (long) closeMap.get("brk_connection_closed_total_count") == 1
- && (long) closeMap.get("brk_connection_create_fail_count") == 0
- && (long) closeMap.get("brk_connection_create_success_count") == 1) {
- return true;
- } else {
- return false;
+ map = null;
+ brokerService.updateRates();
+ metrics = brokerService.getTopicMetrics();
+ for (int i = 0; i < metrics.size(); i++) {
+ if (metrics.get(i).getDimensions().containsValue("broker_connection")) {
+ map = metrics.get(i).getMetrics();
+ break;
}
- });
+ }
+ assertNotNull(map);
+ assertEquals((long) map.get("brk_connection_created_total_count"), 1);
+ assertEquals((long) map.get("brk_active_connections"), 0);
+ assertEquals((long) map.get("brk_connection_closed_total_count"), 1);
+ assertEquals((long) map.get("brk_connection_create_success_count"), 1);
+ assertEquals((long) map.get("brk_connection_create_fail_count"), 0);
pulsar.getConfiguration().setAuthenticationEnabled(true);
if (pulsarClient != null) {
@@ -1557,6 +1587,9 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
assertTrue(e instanceof PulsarClientException.AuthenticationException);
}
+ Awaitility.await().until(() -> this.inactiveCount.get() == 2);
+
+ map = null;
brokerService.updateRates();
metrics = brokerService.getTopicMetrics();
for (int i = 0; i < metrics.size(); i++) {
@@ -1893,4 +1926,50 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
assertTrue(ex.getMessage().contains("Namespace not found"));
}
}
+
+ private class PulsarChannelInitializerForTest extends PulsarChannelInitializer {
+
+ private final PulsarService pulsar;
+
+ private final PulsarChannelOptions opts;
+
+ public PulsarChannelInitializerForTest(PulsarService pulsar, PulsarChannelOptions opts)
+ throws Exception {
+ super(pulsar, opts);
+ this.pulsar = pulsar;
+ this.opts = opts;
+ }
+
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ super.initChannel(ch);
+ //remove parent
+ ch.pipeline().remove("handler");
+ PersistentTopicE2ETest.ServerCnxForTest serverCnxForTest = new PersistentTopicE2ETest.ServerCnxForTest(this.pulsar, this.opts.getListenerName());
+ ch.pipeline().addAfter("flowController", "testHandler", serverCnxForTest);
+ //override parent
+ connections.put(ch.remoteAddress(), serverCnxForTest);
+ }
+
+ }
+
+ private class ServerCnxForTest extends ServerCnx {
+
+ public ServerCnxForTest(PulsarService pulsar, String listenerName) {
+ super(pulsar, listenerName);
+ }
+
+ @Override
+ protected void handleConnect(CommandConnect connect) {
+ //metrics `brk_connection_create_success_count` increase in handleConnect.
+ super.handleConnect(connect);
+ PersistentTopicE2ETest.this.activeCount.incrementAndGet();
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ super.channelInactive(ctx);
+ PersistentTopicE2ETest.this.inactiveCount.incrementAndGet();
+ }
+ }
}