You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/03 04:08:51 UTC
[pulsar] branch master updated: [fix][flaky-test]ManagedCursorMetricsTest.testManagedCursorMetrics (#16878)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a8231a4f821 [fix][flaky-test]ManagedCursorMetricsTest.testManagedCursorMetrics (#16878)
a8231a4f821 is described below
commit a8231a4f821b360d7469685c77268d9591fb072d
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Wed Aug 3 12:08:44 2022 +0800
[fix][flaky-test]ManagedCursorMetricsTest.testManagedCursorMetrics (#16878)
---
.../broker/stats/ManagedCursorMetricsTest.java | 148 ++++++++++++++++-----
1 file changed, 116 insertions(+), 32 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
index 8ab3ee30693..fe63942fd13 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
@@ -18,14 +18,20 @@
*/
package org.apache.pulsar.broker.stats;
+import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
-import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -34,15 +40,16 @@ import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.PulsarTestClient;
import org.apache.pulsar.common.stats.Metrics;
import org.awaitility.Awaitility;
+import org.powermock.reflect.Whitebox;
import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Test(groups = "broker")
public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
- @BeforeMethod(alwaysRun = true)
+ @BeforeClass
@Override
protected void setup() throws Exception {
conf.setTopicLevelPoliciesEnabled(false);
@@ -50,7 +57,7 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
super.internalSetup();
}
- @AfterMethod(alwaysRun = true)
+ @AfterClass
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
@@ -61,21 +68,28 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
return PulsarTestClient.create(clientBuilder);
}
+ /***
+ * This method has overridden these case:
+ * brk_ml_cursor_persistLedgerSucceed
+ * brk_ml_cursor_persistLedgerErrors
+ * brk_ml_cursor_persistZookeeperSucceed
+ * brk_ml_cursor_nonContiguousDeletedMessagesRange
+ * But not overridden "brk_ml_cursor_nonContiguousDeletedMessagesRange".
+ */
@Test
public void testManagedCursorMetrics() throws Exception {
final String subName = "my-sub";
final String topicName = "persistent://my-namespace/use/my-ns/my-topic1";
- final int messageSize = 10;
-
+ /** Before create cursor. Verify metrics will not be generated. **/
+ // Create ManagedCursorMetrics and verify empty.
ManagedCursorMetrics metrics = new ManagedCursorMetrics(pulsar);
-
List<Metrics> metricsList = metrics.generate();
Assert.assertTrue(metricsList.isEmpty());
-
- metricsList = metrics.generate();
- Assert.assertTrue(metricsList.isEmpty());
-
- PulsarTestClient pulsarClient = (PulsarTestClient) this.pulsarClient;
+ /**
+ * Trigger the cursor ledger creation.
+ * After create cursor. Verify all metrics is zero.
+ */
+ // Trigger cursor creation.
@Cleanup
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) this.pulsarClient.newConsumer()
.topic(topicName)
@@ -84,28 +98,84 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
.subscriptionName(subName)
.isAckReceiptEnabled(true)
.subscribe();
-
-
@Cleanup
Producer<byte[]> producer = this.pulsarClient.newProducer()
.topic(topicName)
+ .enableBatching(false)
.create();
-
- producer.send("trigger-cursor-ledger-creation".getBytes());
- // Trigger the cursor ledger creation
- consumer.acknowledge(consumer.receive().getMessageId());
-
- for(PulsarMockLedgerHandle ledgerHandle : mockBookKeeper.getLedgerMap().values()) {
- ledgerHandle.close();
- }
-
- for (int i = 0; i < messageSize; i++) {
+ final PersistentSubscription persistentSubscription =
+ (PersistentSubscription) pulsar.getBrokerService()
+ .getTopic(topicName, false).get().get().getSubscription(subName);
+ final ManagedCursorImpl managedCursor = (ManagedCursorImpl) persistentSubscription.getCursor();
+ ManagedCursorMXBean managedCursorMXBean = managedCursor.getStats();
+ // Assert.
+ metricsList = metrics.generate();
+ Assert.assertFalse(metricsList.isEmpty());
+ Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerSucceed"), 0L);
+ Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerErrors"), 0L);
+ Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"), 0L);
+ Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"), 0L);
+ Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L);
+ /**
+ * 1. Send many messages, and only ack half. After the cursor data is written to BK,
+ * verify "brk_ml_cursor_persistLedgerSucceed" and "brk_ml_cursor_nonContiguousDeletedMessagesRange".
+ * 2. Ack another half, verify "brk_ml_cursor_nonContiguousDeletedMessagesRange" is zero.
+ */
+ // Send many message and ack half.
+ List<MessageId> keepsMessageIdList = new ArrayList<>();
+ for (int i = 0; i < 30; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
- consumer.acknowledge(consumer.receive().getMessageId());
+ if (i < 10 || i > 20) {
+ consumer.acknowledge(consumer.receive().getMessageId());
+ } else {
+ keepsMessageIdList.add(consumer.receive().getMessageId());
+ }
}
-
- Awaitility.await().until(() -> pulsarClient.getConnection(topicName).get().getPendingRequests().size() == 0);
+ // Wait persistent.
+ Awaitility.await().atMost(2, TimeUnit.SECONDS)
+ .until(() -> managedCursorMXBean.getPersistLedgerSucceed() > 0);
+ // Assert.
+ metricsList = metrics.generate();
+ Assert.assertFalse(metricsList.isEmpty());
+ Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerSucceed"), 0L);
+ Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerErrors"), 0L);
+ Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"), 0L);
+ Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"), 0L);
+ Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"),
+ 0L);
+ // Ack another half.
+ for (MessageId messageId : keepsMessageIdList){
+ consumer.acknowledge(messageId);
+ }
+ // Wait persistent.
+ Awaitility.await().atMost(2, TimeUnit.SECONDS)
+ .until(() -> managedCursor.getTotalNonContiguousDeletedMessagesRange() == 0);
+ // Assert.
+ metricsList = metrics.generate();
+ Assert.assertFalse(metricsList.isEmpty());
+ Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerSucceed"), 0L);
+ Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerErrors"), 0L);
+ Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"), 0L);
+ Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"), 0L);
+ Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L);
+ /**
+ * Make BK error, and send many message, then wait cursor persistent finish.
+ * After the cursor data is written to ZK, verify "brk_ml_cursor_persistLedgerErrors" and
+ * "brk_ml_cursor_persistZookeeperSucceed".
+ */
+ // Send amd ack messages, at the same time makes BK error.
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> {
+ String message = UUID.randomUUID().toString();
+ producer.send(message.getBytes());
+ consumer.acknowledge(consumer.receive().getMessageId());
+ // Make BK error.
+ LedgerHandle ledgerHandle = Whitebox.getInternalState(managedCursor, "cursorLedger");
+ ledgerHandle.close();
+ return managedCursorMXBean.getPersistLedgerErrors() > 0
+ && managedCursorMXBean.getPersistZookeeperSucceed() > 0;
+ });
+ // Assert.
metricsList = metrics.generate();
Assert.assertFalse(metricsList.isEmpty());
Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerSucceed"), 0L);
@@ -113,6 +183,18 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"), 0L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"), 0L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L);
+ /**
+ * TODO verify "brk_ml_cursor_persistZookeeperErrors".
+ * This is not easy to implement, we can use {@link #mockZooKeeper} to fail ZK, but we cannot identify whether
+ * the request is triggered by the "create new ledger then write ZK" or the "persistent cursor then write ZK".
+ * The cursor path is "/managed-ledgers/my-namespace/use/my-ns/persistent/my-topic1/my-sub". Maybe we can
+ * mock/spy ManagedCursorImpl to overridden this case in another PR.
+ */
+ mockZooKeeper.unsetAlwaysFail();
+ producer.close();
+ consumer.close();
+ managedCursor.close();
+ admin.topics().delete(topicName, true);
}
@Test
@@ -150,10 +232,6 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
.topic(topicName)
.create();
- for (PulsarMockLedgerHandle ledgerHandle : mockBookKeeper.getLedgerMap().values()) {
- ledgerHandle.close();
- }
-
for (int i = 0; i < messageSize; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
@@ -172,5 +250,11 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
Assert.assertNotEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 0L);
Assert.assertNotEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 0L);
Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L);
+
+ // cleanup.
+ consumer.close();
+ consumer2.close();
+ producer.close();
+ admin.topics().delete(topicName, true);
}
}