You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/09/04 12:59:46 UTC

[pulsar] branch branch-2.10 updated: [fix][flaky-test]ManagedCursorMetricsTest.testCursorReadWriteMetrics (#17045)

This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new eece41bed3e [fix][flaky-test]ManagedCursorMetricsTest.testCursorReadWriteMetrics (#17045)
eece41bed3e is described below

commit eece41bed3e9799f9876e028325557c9d1f6c62f
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Fri Aug 12 20:25:38 2022 +0800

    [fix][flaky-test]ManagedCursorMetricsTest.testCursorReadWriteMetrics (#17045)
    
    (cherry picked from commit e0ff3d7e0083d4ecd9b4af55e11a6bf9e461dfc9)
---
 .../broker/stats/ManagedCursorMetricsTest.java     | 27 +++++++++++++++++++---
 1 file changed, 24 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
index 4648ae2fb8f..60e74d5ca20 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
@@ -19,10 +19,14 @@
 package org.apache.pulsar.broker.stats;
 
 import java.util.List;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
+import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Consumer;
@@ -108,9 +112,19 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
         Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L);
     }
 
+    private ManagedCursorMXBean getManagedCursorMXBean(String topicName, String subscriptionName)
+            throws ExecutionException, InterruptedException {
+        final PersistentSubscription persistentSubscription =
+                (PersistentSubscription) pulsar.getBrokerService()
+                        .getTopic(topicName, false).get().get().getSubscription(subscriptionName);
+        final ManagedCursorImpl managedCursor = (ManagedCursorImpl) persistentSubscription.getCursor();
+        return managedCursor.getStats();
+    }
+
     @Test
     public void testCursorReadWriteMetrics() throws Exception {
-        final String subName = "read-write";
+        final String subName1 = "read-write-sub-1";
+        final String subName2 = "read-write-sub-2";
         final String topicName = "persistent://my-namespace/use/my-ns/read-write";
         final int messageSize = 10;
 
@@ -127,7 +141,7 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
                 .topic(topicName)
                 .subscriptionType(SubscriptionType.Shared)
                 .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName)
+                .subscriptionName(subName1)
                 .subscribe();
 
         @Cleanup
@@ -135,7 +149,7 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
                 .topic(topicName)
                 .subscriptionType(SubscriptionType.Shared)
                 .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName + "-2")
+                .subscriptionName(subName2)
                 .subscribe();
 
         @Cleanup
@@ -156,6 +170,13 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
                 consumer2.acknowledge(consumer.receive().getMessageId());
             }
         }
+
+        // Wait for persistent cursor meta.
+        ManagedCursorMXBean cursorMXBean1 = getManagedCursorMXBean(topicName, subName1);
+        ManagedCursorMXBean cursorMXBean2 = getManagedCursorMXBean(topicName, subName2);
+        Awaitility.await().until(() -> cursorMXBean1.getWriteCursorLedgerLogicalSize() > 0);
+        Awaitility.await().until(() -> cursorMXBean2.getWriteCursorLedgerLogicalSize() > 0);
+
         metricsList = metrics.generate();
         Assert.assertEquals(metricsList.size(), 2);
         Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 26L);