You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/29 06:25:56 UTC

[pulsar] 16/17: [fix][test]: fix flaky test of ManagedCursorMetricsTest.testManagedCursorMetrics (#9919) (#14720)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 38057e42840f8becb4ae893133469b8cc26ee10f
Author: wuxuanqicn <89...@users.noreply.github.com>
AuthorDate: Thu Apr 28 11:57:39 2022 +0800

    [fix][test]: fix flaky test of ManagedCursorMetricsTest.testManagedCursorMetrics (#9919) (#14720)
    
    Fixes #9919
    
    ### Motivation
    
    we need make sure broker executed all ack command and updated metrics, then we can generate and check metric
    
    ### Modifications
    
    - enable AckReceipt
    - await until ack procedure complete(ACK and ACK_RESPONSE command)
    
    Co-authored-by: xuanqi.wu <xu...@weimob.com>
    (cherry picked from commit be7057a1fd878111e85ec112bac8f0b72350e744)
---
 .../broker/stats/ManagedCursorMetricsTest.java     | 26 +++++++++++++++++-----
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  1 +
 2 files changed, 22 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
index 5e20c09fed1..4648ae2fb8f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
@@ -18,22 +18,27 @@
  */
 package org.apache.pulsar.broker.stats;
 
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.PulsarTestClient;
 import org.apache.pulsar.common.stats.Metrics;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
 @Test(groups = "broker")
 public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
 
@@ -49,6 +54,11 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
         super.internalCleanup();
     }
 
+    @Override
+    protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) throws PulsarClientException {
+        return PulsarTestClient.create(clientBuilder);
+    }
+
     @Test
     public void testManagedCursorMetrics() throws Exception {
         final String subName = "my-sub";
@@ -63,14 +73,18 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
         metricsList = metrics.generate();
         Assert.assertTrue(metricsList.isEmpty());
 
-        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+        PulsarTestClient pulsarClient = (PulsarTestClient) this.pulsarClient;
+        @Cleanup
+        ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) this.pulsarClient.newConsumer()
                 .topic(topicName)
                 .subscriptionType(SubscriptionType.Shared)
                 .ackTimeout(1, TimeUnit.SECONDS)
                 .subscriptionName(subName)
+                .isAckReceiptEnabled(true)
                 .subscribe();
 
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        @Cleanup
+        Producer<byte[]> producer = this.pulsarClient.newProducer()
                 .topic(topicName)
                 .create();
 
@@ -83,6 +97,8 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
             producer.send(message.getBytes());
             consumer.acknowledge(consumer.receive().getMessageId());
         }
+
+        Awaitility.await().until(() -> pulsarClient.getConnection(topicName).get().getPendingRequests().size() == 0);
         metricsList = metrics.generate();
         Assert.assertFalse(metricsList.isEmpty());
         Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerSucceed"), 0L);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 00b8e4f7561..a8d1cf51c71 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -109,6 +109,7 @@ public class ClientCnx extends PulsarHandler {
     protected final Authentication authentication;
     private State state;
 
+    @Getter
     private final ConcurrentLongHashMap<TimedCompletableFuture<? extends Object>> pendingRequests =
             ConcurrentLongHashMap.<TimedCompletableFuture<? extends Object>>newBuilder()
                     .expectedItems(16)