You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/15 02:36:33 UTC

[pulsar] 02/15: [fix][flaky-test] MessageTTLTest.testMessageExpiryAfterTopicUnload (#16462)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f8ac8ec0df51917475957a9d5dde36da1b4293d3
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Jul 12 08:40:18 2022 +0800

    [fix][flaky-test] MessageTTLTest.testMessageExpiryAfterTopicUnload (#16462)
    
    (cherry picked from commit 7c9ad1c6df971c06ca0da0688959a36914db7fde)
---
 .../pulsar/broker/service/MessageTTLTest.java      | 33 +++++++++++++---------
 1 file changed, 20 insertions(+), 13 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java
index 31556197486..76f09377edc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java
@@ -26,16 +26,20 @@ import static org.testng.Assert.assertNotNull;
 import com.google.common.collect.Lists;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterClass;
@@ -83,6 +87,8 @@ public class MessageTTLTest extends BrokerTestBase {
             sendFutureList.add(producer.sendAsync(message));
         }
         FutureUtil.waitForAll(sendFutureList).get();
+        MessageIdImpl firstMessageId = (MessageIdImpl) sendFutureList.get(0).get();
+        MessageIdImpl lastMessageId = (MessageIdImpl) sendFutureList.get(sendFutureList.size() - 1).get();
         producer.close();
         // unload a reload the topic
         // this action created a new ledger
@@ -94,19 +100,20 @@ public class MessageTTLTest extends BrokerTestBase {
         PersistentTopicInternalStats internalStatsBeforeExpire = admin.topics().getInternalStats(topicName);
         CursorStats statsBeforeExpire = internalStatsBeforeExpire.cursors.get(subscriptionName);
         log.info("markDeletePosition before expire {}", statsBeforeExpire.markDeletePosition);
-        assertEquals(statsBeforeExpire.markDeletePosition, PositionImpl.get(3, -1).toString());
-
-        // wall clock time, we have to make the message to be considered "expired"
-        Thread.sleep(this.conf.getTtlDurationDefaultInSeconds() * 2000L);
-        log.info("***** run message expiry now");
-        this.runMessageExpiryCheck();
-
-        // verify that the markDeletePosition was moved forward, and exacly to the last message
-        PersistentTopicInternalStats internalStatsAfterExpire = admin.topics().getInternalStats(topicName);
-        CursorStats statsAfterExpire = internalStatsAfterExpire.cursors.get(subscriptionName);
-        log.info("markDeletePosition after expire {}", statsAfterExpire.markDeletePosition);
-        assertEquals(statsAfterExpire.markDeletePosition, PositionImpl.get(3, numMsgs - 1 ).toString());
-
+        assertEquals(statsBeforeExpire.markDeletePosition,
+                PositionImpl.get(firstMessageId.getLedgerId(), -1).toString());
+
+        Awaitility.await().timeout(30, TimeUnit.SECONDS)
+                .pollDelay(3, TimeUnit.SECONDS).untilAsserted(() -> {
+            this.runMessageExpiryCheck();
+            log.info("***** run message expiry now");
+            // verify that the markDeletePosition was moved forward, and exacly to the last message
+            PersistentTopicInternalStats internalStatsAfterExpire = admin.topics().getInternalStats(topicName);
+            CursorStats statsAfterExpire = internalStatsAfterExpire.cursors.get(subscriptionName);
+            log.info("markDeletePosition after expire {}", statsAfterExpire.markDeletePosition);
+            assertEquals(statsAfterExpire.markDeletePosition, PositionImpl.get(lastMessageId.getLedgerId(),
+                    lastMessageId.getEntryId() ).toString());
+        });
     }
 
     @Test