You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ti...@apache.org on 2022/12/09 01:52:53 UTC

[pulsar] branch branch-2.10 updated: [branch-2.10][fix][flaky-test] ManagedCursorMetricsTest.testManagedCursorMetrics (#17504)

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 6c4e9ef532d [branch-2.10][fix][flaky-test] ManagedCursorMetricsTest.testManagedCursorMetrics (#17504)
6c4e9ef532d is described below

commit 6c4e9ef532d5d509c28c9728fc338489880922a4
Author: fengyubiao <99...@qq.com>
AuthorDate: Fri Dec 9 09:52:46 2022 +0800

    [branch-2.10][fix][flaky-test] ManagedCursorMetricsTest.testManagedCursorMetrics (#17504)
---
 .../broker/stats/ManagedCursorMetricsTest.java     | 157 ++++++++++++++++-----
 1 file changed, 125 insertions(+), 32 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
index 60e74d5ca20..dbd40dc3d7c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
@@ -18,11 +18,13 @@
  */
 package org.apache.pulsar.broker.stats;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
-import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
+import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -30,6 +32,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -38,21 +41,24 @@ import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.PulsarTestClient;
 import org.apache.pulsar.common.stats.Metrics;
 import org.awaitility.Awaitility;
+import org.powermock.reflect.Whitebox;
 import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
 public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
 
-    @BeforeMethod(alwaysRun = true)
+    @BeforeClass
     @Override
     protected void setup() throws Exception {
+        conf.setTopicLevelPoliciesEnabled(false);
+        conf.setSystemTopicEnabled(false);
         super.internalSetup();
     }
 
-    @AfterMethod(alwaysRun = true)
+    @AfterClass
     @Override
     protected void cleanup() throws Exception {
         super.internalCleanup();
@@ -63,21 +69,28 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
         return PulsarTestClient.create(clientBuilder);
     }
 
+    /***
+     * This method has overridden these case:
+     *    brk_ml_cursor_persistLedgerSucceed
+     *    brk_ml_cursor_persistLedgerErrors
+     *    brk_ml_cursor_persistZookeeperSucceed
+     *    brk_ml_cursor_nonContiguousDeletedMessagesRange
+     * But not overridden "brk_ml_cursor_nonContiguousDeletedMessagesRange".
+     */
     @Test
     public void testManagedCursorMetrics() throws Exception {
         final String subName = "my-sub";
         final String topicName = "persistent://my-namespace/use/my-ns/my-topic1";
-        final int messageSize = 10;
-
+        /** Before create cursor. Verify metrics will not be generated. **/
+        // Create ManagedCursorMetrics and verify empty.
         ManagedCursorMetrics metrics = new ManagedCursorMetrics(pulsar);
-
         List<Metrics> metricsList = metrics.generate();
         Assert.assertTrue(metricsList.isEmpty());
-
-        metricsList = metrics.generate();
-        Assert.assertTrue(metricsList.isEmpty());
-
-        PulsarTestClient pulsarClient = (PulsarTestClient) this.pulsarClient;
+        /**
+         * Trigger the cursor ledger creation.
+         * After create cursor. Verify all metrics is zero.
+         */
+        // Trigger cursor creation.
         @Cleanup
         ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) this.pulsarClient.newConsumer()
                 .topic(topicName)
@@ -86,30 +99,108 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
                 .subscriptionName(subName)
                 .isAckReceiptEnabled(true)
                 .subscribe();
-
         @Cleanup
         Producer<byte[]> producer = this.pulsarClient.newProducer()
                 .topic(topicName)
+                .enableBatching(false)
                 .create();
-
-        for(PulsarMockLedgerHandle ledgerHandle : mockBookKeeper.getLedgerMap().values()) {
-            ledgerHandle.close();
-        }
-
-        for (int i = 0; i < messageSize; i++) {
+        final PersistentSubscription persistentSubscription =
+                (PersistentSubscription) pulsar.getBrokerService()
+                        .getTopic(topicName, false).get().get().getSubscription(subName);
+        final ManagedCursorImpl managedCursor = (ManagedCursorImpl) persistentSubscription.getCursor();
+        ManagedCursorMXBean managedCursorMXBean = managedCursor.getStats();
+        // Assert.
+        metricsList = metrics.generate();
+        Assert.assertFalse(metricsList.isEmpty());
+        /*
+          see: https://github.com/apache/pulsar/pull/17504
+          "createNewMetadataLedger" triggers once BK write, and "initialize" triggers the execution of
+          "createNewMetadataLedger". The logic of the branch master has been changed, so this line is different.
+         */
+        Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerSucceed"), 1L);
+        Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerErrors"), 0L);
+        Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"), 0L);
+        Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"), 0L);
+        Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L);
+        /**
+         * 1. Send many messages, and only ack half. After the cursor data is written to BK,
+         *    verify "brk_ml_cursor_persistLedgerSucceed" and "brk_ml_cursor_nonContiguousDeletedMessagesRange".
+         * 2. Ack another half, verify "brk_ml_cursor_nonContiguousDeletedMessagesRange" is zero.
+         */
+        // Send many message and ack half.
+        List<MessageId> keepsMessageIdList = new ArrayList<>();
+        for (int i = 0; i < 30; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
-            consumer.acknowledge(consumer.receive().getMessageId());
+            if (i < 10 || i > 20) {
+                consumer.acknowledge(consumer.receive().getMessageId());
+            } else {
+                keepsMessageIdList.add(consumer.receive().getMessageId());
+            }
         }
-
-        Awaitility.await().until(() -> pulsarClient.getConnection(topicName).get().getPendingRequests().size() == 0);
+        // Wait persistent.
+        Awaitility.await().atMost(2, TimeUnit.SECONDS)
+                .until(() -> managedCursorMXBean.getPersistLedgerSucceed() > 0);
+        // Assert.
+        metricsList = metrics.generate();
+        Assert.assertFalse(metricsList.isEmpty());
+        Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerSucceed"), 1L);
+        Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerErrors"), 0L);
+        Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"), 0L);
+        Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"), 0L);
+        Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"),
+                0L);
+        // Ack another half.
+        for (MessageId messageId : keepsMessageIdList){
+            consumer.acknowledge(messageId);
+        }
+        // Wait persistent.
+        Awaitility.await().atMost(2, TimeUnit.SECONDS)
+                .until(() -> managedCursor.getTotalNonContiguousDeletedMessagesRange() == 0);
+        // Assert.
+        metricsList = metrics.generate();
+        Assert.assertFalse(metricsList.isEmpty());
+        Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerSucceed"), 1L);
+        Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerErrors"), 0L);
+        Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"), 0L);
+        Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"), 0L);
+        Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L);
+        /**
+         * Make BK error, and send many message, then wait cursor persistent finish.
+         * After the cursor data is written to ZK, verify "brk_ml_cursor_persistLedgerErrors" and
+         * "brk_ml_cursor_persistZookeeperSucceed".
+         */
+        // Send amd ack messages, at the same time makes BK error.
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> {
+            String message = UUID.randomUUID().toString();
+            producer.send(message.getBytes());
+            consumer.acknowledge(consumer.receive().getMessageId());
+            // Make BK error.
+            LedgerHandle ledgerHandle = Whitebox.getInternalState(managedCursor, "cursorLedger");
+            ledgerHandle.close();
+            return managedCursorMXBean.getPersistLedgerErrors() > 0
+                    && managedCursorMXBean.getPersistZookeeperSucceed() > 0;
+        });
+        // Assert.
         metricsList = metrics.generate();
         Assert.assertFalse(metricsList.isEmpty());
-        Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerSucceed"), 0L);
+        Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerSucceed"), 1L);
         Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerErrors"), 0L);
         Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"), 0L);
         Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"), 0L);
         Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L);
+        /**
+         * TODO verify "brk_ml_cursor_persistZookeeperErrors".
+         * This is not easy to implement, we can use {@link #mockZooKeeper} to fail ZK, but we cannot identify whether
+         * the request is triggered by the "create new ledger then write ZK" or the "persistent cursor then write ZK".
+         * The cursor path is "/managed-ledgers/my-namespace/use/my-ns/persistent/my-topic1/my-sub". Maybe we can
+         * mock/spy ManagedCursorImpl to overridden this case in another PR.
+         */
+        mockZooKeeper.unsetAlwaysFail();
+        producer.close();
+        consumer.close();
+        managedCursor.close();
+        admin.topics().delete(topicName, true);
     }
 
     private ManagedCursorMXBean getManagedCursorMXBean(String topicName, String subscriptionName)
@@ -157,10 +248,6 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
                 .topic(topicName)
                 .create();
 
-        for (PulsarMockLedgerHandle ledgerHandle : mockBookKeeper.getLedgerMap().values()) {
-            ledgerHandle.close();
-        }
-
         for (int i = 0; i < messageSize; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
@@ -179,12 +266,18 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
 
         metricsList = metrics.generate();
         Assert.assertEquals(metricsList.size(), 2);
-        Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 26L);
-        Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 13L);
+        Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 0L);
+        Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 0L);
         Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L);
 
-        Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 26L);
-        Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 13L);
+        Assert.assertNotEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 0L);
+        Assert.assertNotEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 0L);
         Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L);
+
+        // cleanup.
+        consumer.close();
+        consumer2.close();
+        producer.close();
+        admin.topics().delete(topicName, true);
     }
 }