You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/01/19 17:03:31 UTC

[pulsar] branch master updated: [flaky test] Fix unit tests that occasionally fail (#9226)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 63acd20  [flaky test] Fix unit tests that occasionally fail (#9226)
63acd20 is described below

commit 63acd204914734c7098e5b7fb9e6fb3ee37810a7
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Wed Jan 20 01:02:52 2021 +0800

    [flaky test] Fix unit tests that occasionally fail (#9226)
    
    * fix unit test
    
    * fix unit test
    
    * fix unit test
---
 .../broker/admin/IncrementPartitionsTest.java      |  13 ++-
 .../pulsar/broker/admin/TopicMessageTTLTest.java   |   1 +
 .../pulsar/broker/admin/v1/V1_AdminApiTest2.java   |   1 +
 .../broker/service/ConsumedLedgersTrimTest.java    |   6 +-
 .../service/MessagePublishBufferThrottleTest.java  |   1 +
 .../broker/service/ReplicatorRateLimiterTest.java  |   3 +
 .../pulsar/broker/service/ReplicatorTestBase.java  | 129 +++++++++++++--------
 .../service/persistent/TopicDuplicationTest.java   |  28 ++---
 8 files changed, 112 insertions(+), 70 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
index 65def47..d3f5d7b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
@@ -22,6 +22,8 @@ import static org.testng.Assert.assertEquals;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
 import lombok.Cleanup;
 import org.apache.pulsar.broker.admin.AdminApiTest.MockedPulsarService;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -33,6 +35,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -115,13 +118,17 @@ public class IncrementPartitionsTest extends MockedPulsarServiceBaseTest {
                 .create();
 
         admin.topics().updatePartitionedTopic(partitionedTopicName, 2);
-        assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 2);
+        //zk update takes some time
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() ->
+                assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 2));
 
         admin.topics().updatePartitionedTopic(partitionedTopicName, 10);
-        assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 10);
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() ->
+                assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 10));
 
         admin.topics().updatePartitionedTopic(partitionedTopicName, 20);
-        assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 20);
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() ->
+                assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 20));
     }
 
     @Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicMessageTTLTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicMessageTTLTest.java
index 7bb93d7..e0eb52b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicMessageTTLTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicMessageTTLTest.java
@@ -49,6 +49,7 @@ public class TopicMessageTTLTest extends MockedPulsarServiceBaseTest {
     @BeforeMethod
     @Override
     protected void setup() throws Exception {
+        resetConfig();
         this.conf.setSystemTopicEnabled(true);
         this.conf.setTopicLevelPoliciesEnabled(true);
         this.conf.setTtlDurationDefaultInSeconds(3600);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
index e797eb8..6d4148e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
@@ -83,6 +83,7 @@ public class V1_AdminApiTest2 extends MockedPulsarServiceBaseTest {
     @BeforeMethod
     @Override
     public void setup() throws Exception {
+        resetConfig();
         conf.setLoadBalancerEnabled(true);
         super.internalSetup();
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
index af3a589..678d9a5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
@@ -35,6 +35,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
 
 public class ConsumedLedgersTrimTest extends BrokerTestBase {
 
@@ -45,9 +46,10 @@ public class ConsumedLedgersTrimTest extends BrokerTestBase {
         //No-op
     }
 
+    @AfterMethod
     @Override
     protected void cleanup() throws Exception {
-        //No-op
+        super.internalCleanup();
     }
 
     @Test
@@ -101,7 +103,7 @@ public class ConsumedLedgersTrimTest extends BrokerTestBase {
 
 
     @Test
-    public void TestConsumedLedgersTrimNoSubscriptions() throws Exception {
+    public void testConsumedLedgersTrimNoSubscriptions() throws Exception {
         conf.setRetentionCheckIntervalInSeconds(1);
         conf.setBrokerDeleteInactiveTopicsEnabled(false);
         super.baseSetup();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
index 6dd4515..75248f4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
@@ -43,6 +43,7 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase {
     @Override
     protected void cleanup() throws Exception {
         super.internalCleanup();
+        resetConfig();
     }
 
     @Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
index 9e68cc5..5a4f66d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
@@ -69,6 +69,9 @@ public class ReplicatorRateLimiterTest extends ReplicatorTestBase {
     @AfterClass(alwaysRun = true, timeOut = 300000)
     void shutdown() throws Exception {
         super.shutdown();
+        resetConfig1();
+        resetConfig2();
+        resetConfig3();
     }
 
     enum DispatchRateType {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 9f7ef09..38ed451 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -110,23 +110,7 @@ public class ReplicatorTestBase {
         // NOTE: we have to instantiate a new copy of System.getProperties() to make sure pulsar1 and pulsar2 have
         // completely
         // independent config objects instead of referring to the same properties object
-        config1.setClusterName("r1");
-        config1.setAdvertisedAddress("localhost");
-        config1.setWebServicePort(Optional.of(0));
-        config1.setWebServicePortTls(Optional.of(0));
-        config1.setZookeeperServers("127.0.0.1:" + bkEnsemble1.getZookeeperPort());
-        config1.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
-        config1.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
-        config1.setBrokerDeleteInactiveTopicsFrequencySeconds(
-                inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
-        config1.setBrokerServicePort(Optional.of(0));
-        config1.setBrokerServicePortTls(Optional.of(0));
-        config1.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
-        config1.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
-        config1.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
-        config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
-        config1.setDefaultNumberOfNamespaceBundles(1);
-        config1.setAllowAutoTopicCreationType("non-partitioned");
+        setConfig1DefaultValue();
         pulsar1 = new PulsarService(config1);
         pulsar1.start();
         ns1 = pulsar1.getBrokerService();
@@ -141,23 +125,7 @@ public class ReplicatorTestBase {
         bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
         bkEnsemble2.start();
 
-        config2.setClusterName("r2");
-        config2.setAdvertisedAddress("localhost");
-        config2.setWebServicePort(Optional.of(0));
-        config2.setWebServicePortTls(Optional.of(0));
-        config2.setZookeeperServers("127.0.0.1:" + bkEnsemble2.getZookeeperPort());
-        config2.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
-        config2.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
-        config2.setBrokerDeleteInactiveTopicsFrequencySeconds(
-                inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
-        config2.setBrokerServicePort(Optional.of(0));
-        config2.setBrokerServicePortTls(Optional.of(0));
-        config2.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
-        config2.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
-        config2.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
-        config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
-        config2.setDefaultNumberOfNamespaceBundles(1);
-        config2.setAllowAutoTopicCreationType("non-partitioned");
+        setConfig2DefaultValue();
         pulsar2 = new PulsarService(config2);
         pulsar2.start();
         ns2 = pulsar2.getBrokerService();
@@ -172,23 +140,7 @@ public class ReplicatorTestBase {
         bkEnsemble3 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
         bkEnsemble3.start();
 
-        config3.setClusterName("r3");
-        config3.setAdvertisedAddress("localhost");
-        config3.setWebServicePort(Optional.of(0));
-        config3.setWebServicePortTls(Optional.of(0));
-        config3.setZookeeperServers("127.0.0.1:" + bkEnsemble3.getZookeeperPort());
-        config3.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
-        config3.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
-        config3.setBrokerDeleteInactiveTopicsFrequencySeconds(
-                inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
-        config3.setBrokerServicePort(Optional.of(0));
-        config3.setBrokerServicePortTls(Optional.of(0));
-        config3.setTlsEnabled(true);
-        config3.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
-        config3.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
-        config3.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
-        config3.setDefaultNumberOfNamespaceBundles(1);
-        config3.setAllowAutoTopicCreationType("non-partitioned");
+        setConfig3DefaultValue();
         pulsar3 = new PulsarService(config3);
         pulsar3.start();
         ns3 = pulsar3.getBrokerService();
@@ -227,6 +179,81 @@ public class ReplicatorTestBase {
 
     }
 
+    private void setConfig3DefaultValue() {
+        config3.setClusterName("r3");
+        config3.setAdvertisedAddress("localhost");
+        config3.setWebServicePort(Optional.of(0));
+        config3.setWebServicePortTls(Optional.of(0));
+        config3.setZookeeperServers("127.0.0.1:" + bkEnsemble3.getZookeeperPort());
+        config3.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
+        config3.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
+        config3.setBrokerDeleteInactiveTopicsFrequencySeconds(
+                inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
+        config3.setBrokerServicePort(Optional.of(0));
+        config3.setBrokerServicePortTls(Optional.of(0));
+        config3.setTlsEnabled(true);
+        config3.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+        config3.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+        config3.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
+        config3.setDefaultNumberOfNamespaceBundles(1);
+        config3.setAllowAutoTopicCreationType("non-partitioned");
+    }
+
+    public void setConfig1DefaultValue(){
+        config1.setClusterName("r1");
+        config1.setAdvertisedAddress("localhost");
+        config1.setWebServicePort(Optional.of(0));
+        config1.setWebServicePortTls(Optional.of(0));
+        config1.setZookeeperServers("127.0.0.1:" + bkEnsemble1.getZookeeperPort());
+        config1.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
+        config1.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
+        config1.setBrokerDeleteInactiveTopicsFrequencySeconds(
+                inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
+        config1.setBrokerServicePort(Optional.of(0));
+        config1.setBrokerServicePortTls(Optional.of(0));
+        config1.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+        config1.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+        config1.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
+        config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+        config1.setDefaultNumberOfNamespaceBundles(1);
+        config1.setAllowAutoTopicCreationType("non-partitioned");
+    }
+
+    public void setConfig2DefaultValue() {
+        config2.setClusterName("r2");
+        config2.setAdvertisedAddress("localhost");
+        config2.setWebServicePort(Optional.of(0));
+        config2.setWebServicePortTls(Optional.of(0));
+        config2.setZookeeperServers("127.0.0.1:" + bkEnsemble2.getZookeeperPort());
+        config2.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
+        config2.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
+        config2.setBrokerDeleteInactiveTopicsFrequencySeconds(
+                inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
+        config2.setBrokerServicePort(Optional.of(0));
+        config2.setBrokerServicePortTls(Optional.of(0));
+        config2.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+        config2.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+        config2.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
+        config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+        config2.setDefaultNumberOfNamespaceBundles(1);
+        config2.setAllowAutoTopicCreationType("non-partitioned");
+    }
+
+    public void resetConfig1() {
+        config1 = new ServiceConfiguration();
+        setConfig1DefaultValue();
+    }
+
+    public void resetConfig2() {
+        config2 = new ServiceConfiguration();
+        setConfig2DefaultValue();
+    }
+
+    public void resetConfig3() {
+        config3 = new ServiceConfiguration();
+        setConfig3DefaultValue();
+    }
+
     private int inSec(int time, TimeUnit unit) {
         return (int) TimeUnit.SECONDS.convert(time, unit);
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
index 397a6a5..fdea264 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
@@ -106,13 +106,13 @@ public class TopicDuplicationTest extends ProducerConsumerBase {
     }
 
     @Test(timeOut = 30000)
-    private void testTopicPolicyTakeSnapshot() throws Exception {
+    public void testTopicPolicyTakeSnapshot() throws Exception {
         resetConfig();
         conf.setSystemTopicEnabled(true);
         conf.setTopicLevelPoliciesEnabled(true);
         conf.setBrokerDeduplicationEnabled(true);
         conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(1);
-        conf.setBrokerDeduplicationSnapshotIntervalSeconds(5);
+        conf.setBrokerDeduplicationSnapshotIntervalSeconds(7);
         conf.setBrokerDeduplicationEntriesInterval(20000);
         super.internalCleanup();
         super.internalSetup();
@@ -124,10 +124,10 @@ public class TopicDuplicationTest extends ProducerConsumerBase {
         Producer<String> producer = pulsarClient
                 .newProducer(Schema.STRING).topic(topicName).enableBatching(false).producerName(producerName).create();
         waitCacheInit(topicName);
-        admin.topics().setDeduplicationSnapshotInterval(topicName, 1);
-        admin.namespaces().setDeduplicationSnapshotInterval(myNamespace, 2);
+        admin.topics().setDeduplicationSnapshotInterval(topicName, 3);
+        admin.namespaces().setDeduplicationSnapshotInterval(myNamespace, 5);
 
-        int msgNum = 50;
+        int msgNum = 10;
         CountDownLatch countDownLatch = new CountDownLatch(msgNum);
         for (int i = 0; i < msgNum; i++) {
             producer.newMessage().value("msg" + i).sendAsync().whenComplete((res, e) -> countDownLatch.countDown());
@@ -139,19 +139,19 @@ public class TopicDuplicationTest extends ProducerConsumerBase {
                 .getManagedLedger().getLastConfirmedEntry();
         assertEquals(seqId, msgNum - 1);
         assertEquals(position.getEntryId(), msgNum - 1);
-        //The first time, use topic-leve policies, 1 second delay + 1 second interval
-        Awaitility.await().atMost(2100, TimeUnit.MILLISECONDS)
+        //The first time, use topic-leve policies, 1 second delay + 3 second interval
+        Awaitility.await().atMost(5000, TimeUnit.MILLISECONDS)
                 .until(() -> ((PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor()
                         .getMarkDeletedPosition()).getEntryId() == msgNum - 1);
         ManagedCursor managedCursor = persistentTopic.getMessageDeduplication().getManagedCursor();
         PositionImpl markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
         assertEquals(position, markDeletedPosition);
 
-        //remove topic-level policies, namespace-level should be used, interval becomes 2 seconds
+        //remove topic-level policies, namespace-level should be used, interval becomes 5 seconds
         admin.topics().removeDeduplicationSnapshotInterval(topicName);
         producer.newMessage().value("msg").send();
-        //zk update time + interval time
-        Awaitility.await().atMost( 3000, TimeUnit.MILLISECONDS)
+        //zk update time + 5 second interval time
+        Awaitility.await().atMost( 7, TimeUnit.SECONDS)
                 .until(() -> ((PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor()
                         .getMarkDeletedPosition()).getEntryId() == msgNum);
         markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
@@ -159,20 +159,20 @@ public class TopicDuplicationTest extends ProducerConsumerBase {
         assertEquals(msgNum, markDeletedPosition.getEntryId());
         assertEquals(position, markDeletedPosition);
 
-        //4 remove namespace-level policies, broker-level should be used, interval becomes 2 seconds
+        //4 remove namespace-level policies, broker-level should be used, interval becomes 3 seconds
         admin.namespaces().removeDeduplicationSnapshotInterval(myNamespace);
-        Awaitility.await().atMost(2, TimeUnit.SECONDS)
+        Awaitility.await().atMost(4, TimeUnit.SECONDS)
                 .until(() -> (admin.namespaces().getDeduplicationSnapshotInterval(myNamespace) == null));
         producer.newMessage().value("msg").send();
         //ensure that the time exceeds the scheduling interval of ns and topic, but no snapshot is generated
         Thread.sleep(3000);
         markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
         position = (PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
-        // broker-level interval is 5 seconds, so 3 seconds will not take a snapshot
+        // broker-level interval is 7 seconds, so 3 seconds will not take a snapshot
         assertNotEquals(msgNum + 1, markDeletedPosition.getEntryId());
         assertNotEquals(position, markDeletedPosition);
         // wait for scheduler
-        Awaitility.await().atMost(3, TimeUnit.SECONDS)
+        Awaitility.await().atMost(5, TimeUnit.SECONDS)
                 .until(() -> ((PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor()
                         .getMarkDeletedPosition()).getEntryId() == msgNum + 1);
         markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();