You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/29 06:25:56 UTC
[pulsar] 16/17: [fix][test]: fix flaky test of ManagedCursorMetricsTest.testManagedCursorMetrics (#9919) (#14720)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 38057e42840f8becb4ae893133469b8cc26ee10f
Author: wuxuanqicn <89...@users.noreply.github.com>
AuthorDate: Thu Apr 28 11:57:39 2022 +0800
[fix][test]: fix flaky test of ManagedCursorMetricsTest.testManagedCursorMetrics (#9919) (#14720)
Fixes #9919
### Motivation
we need make sure broker executed all ack command and updated metrics, then we can generate and check metric
### Modifications
- enable AckReceipt
- await until ack procedure complete(ACK and ACK_RESPONSE command)
Co-authored-by: xuanqi.wu <xu...@weimob.com>
(cherry picked from commit be7057a1fd878111e85ec112bac8f0b72350e744)
---
.../broker/stats/ManagedCursorMetricsTest.java | 26 +++++++++++++++++-----
.../org/apache/pulsar/client/impl/ClientCnx.java | 1 +
2 files changed, 22 insertions(+), 5 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
index 5e20c09fed1..4648ae2fb8f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
@@ -18,22 +18,27 @@
*/
package org.apache.pulsar.broker.stats;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.PulsarTestClient;
import org.apache.pulsar.common.stats.Metrics;
+import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
@Test(groups = "broker")
public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
@@ -49,6 +54,11 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
super.internalCleanup();
}
+ @Override
+ protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) throws PulsarClientException {
+ return PulsarTestClient.create(clientBuilder);
+ }
+
@Test
public void testManagedCursorMetrics() throws Exception {
final String subName = "my-sub";
@@ -63,14 +73,18 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
metricsList = metrics.generate();
Assert.assertTrue(metricsList.isEmpty());
- Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ PulsarTestClient pulsarClient = (PulsarTestClient) this.pulsarClient;
+ @Cleanup
+ ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) this.pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionName(subName)
+ .isAckReceiptEnabled(true)
.subscribe();
- Producer<byte[]> producer = pulsarClient.newProducer()
+ @Cleanup
+ Producer<byte[]> producer = this.pulsarClient.newProducer()
.topic(topicName)
.create();
@@ -83,6 +97,8 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
producer.send(message.getBytes());
consumer.acknowledge(consumer.receive().getMessageId());
}
+
+ Awaitility.await().until(() -> pulsarClient.getConnection(topicName).get().getPendingRequests().size() == 0);
metricsList = metrics.generate();
Assert.assertFalse(metricsList.isEmpty());
Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerSucceed"), 0L);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 00b8e4f7561..a8d1cf51c71 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -109,6 +109,7 @@ public class ClientCnx extends PulsarHandler {
protected final Authentication authentication;
private State state;
+ @Getter
private final ConcurrentLongHashMap<TimedCompletableFuture<? extends Object>> pendingRequests =
ConcurrentLongHashMap.<TimedCompletableFuture<? extends Object>>newBuilder()
.expectedItems(16)