You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/09/04 12:59:46 UTC
[pulsar] branch branch-2.10 updated: [fix][flaky-test]ManagedCursorMetricsTest.testCursorReadWriteMetrics (#17045)
This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new eece41bed3e [fix][flaky-test]ManagedCursorMetricsTest.testCursorReadWriteMetrics (#17045)
eece41bed3e is described below
commit eece41bed3e9799f9876e028325557c9d1f6c62f
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Fri Aug 12 20:25:38 2022 +0800
[fix][flaky-test]ManagedCursorMetricsTest.testCursorReadWriteMetrics (#17045)
(cherry picked from commit e0ff3d7e0083d4ecd9b4af55e11a6bf9e461dfc9)
---
.../broker/stats/ManagedCursorMetricsTest.java | 27 +++++++++++++++++++---
1 file changed, 24 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
index 4648ae2fb8f..60e74d5ca20 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
@@ -19,10 +19,14 @@
package org.apache.pulsar.broker.stats;
import java.util.List;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
+import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
@@ -108,9 +112,19 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L);
}
+ private ManagedCursorMXBean getManagedCursorMXBean(String topicName, String subscriptionName)
+ throws ExecutionException, InterruptedException {
+ final PersistentSubscription persistentSubscription =
+ (PersistentSubscription) pulsar.getBrokerService()
+ .getTopic(topicName, false).get().get().getSubscription(subscriptionName);
+ final ManagedCursorImpl managedCursor = (ManagedCursorImpl) persistentSubscription.getCursor();
+ return managedCursor.getStats();
+ }
+
@Test
public void testCursorReadWriteMetrics() throws Exception {
- final String subName = "read-write";
+ final String subName1 = "read-write-sub-1";
+ final String subName2 = "read-write-sub-2";
final String topicName = "persistent://my-namespace/use/my-ns/read-write";
final int messageSize = 10;
@@ -127,7 +141,7 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
- .subscriptionName(subName)
+ .subscriptionName(subName1)
.subscribe();
@Cleanup
@@ -135,7 +149,7 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
- .subscriptionName(subName + "-2")
+ .subscriptionName(subName2)
.subscribe();
@Cleanup
@@ -156,6 +170,13 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
consumer2.acknowledge(consumer.receive().getMessageId());
}
}
+
+ // Wait for persistent cursor meta.
+ ManagedCursorMXBean cursorMXBean1 = getManagedCursorMXBean(topicName, subName1);
+ ManagedCursorMXBean cursorMXBean2 = getManagedCursorMXBean(topicName, subName2);
+ Awaitility.await().until(() -> cursorMXBean1.getWriteCursorLedgerLogicalSize() > 0);
+ Awaitility.await().until(() -> cursorMXBean2.getWriteCursorLedgerLogicalSize() > 0);
+
metricsList = metrics.generate();
Assert.assertEquals(metricsList.size(), 2);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 26L);