You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/10/19 06:24:43 UTC

[GitHub] [pulsar] eolivelli commented on a diff in pull request #15628: Fix delay messages are expired by TTL policy

eolivelli commented on code in PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#discussion_r998987315


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java:
##########
@@ -77,8 +77,13 @@ public boolean expireMessages(int messageTTLInSeconds) {
 
             cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
                 try {
-                    long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
-                    return MessageImpl.isEntryExpired(messageTTLInSeconds, entryTimestamp);
+                    MessageImpl.EntryMetadata entryMetadata = MessageImpl.getEntryMetadata(entry.getDataBuffer());

Review Comment:
   I think that we can save resources and do not allocate this object.
   we can move this logic into a  the new method `isEntryExpired(messageTTLInSeconds, entryTimestamp, delayTime)`
   



##########
pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java:
##########
@@ -484,6 +484,40 @@ public void testMessageBrokerAndEntryMetadataTimestampMissed() {
         }
     }
 
+    @Test(timeOut = 30000)
+    public void testDelayMessageConflictWithTtlPolicy() {
+        String data = "test-message";
+        ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length());
+        byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8));
+
+        long curTime = System.currentTimeMillis();
+        try {
+            MessageMetadata messageMetadata = new MessageMetadata()
+                    .setSequenceId(1)
+                    .setProducerName("testProducer")
+                    .setPartitionKeyB64Encoded(false)
+                    .setPublishTime(1)
+                    .setDeliverAtTime(curTime + 50000);
+            byteBuf = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, byteBuf);
+
+            MessageImpl.EntryMetadata entryMetadata = MessageImpl.getEntryMetadata(byteBuf);
+            assertEquals(entryMetadata.getDelayTime(), curTime + 50000); // delay 50s
+            assertEquals(entryMetadata.getPublishTime(), 1);
+
+            // when we set the delay message time to 50s, then when the TTL policy is 60s, it is greater than the
+            // time set for the delayed message, so the delayed message can be expired.
+            assertTrue(MessageImpl.isEntryExpired(60,
+                    entryMetadata.getPublishTime(), entryMetadata.getDelayTime()));
+
+            // when we set the delay message time to 50s, then when the TTL policy is 40s, it is less than the
+            // time set for the delayed message, so the delayed message cannot be expired.
+            assertFalse(MessageImpl.isEntryExpired(40,
+                    entryMetadata.getPublishTime(), entryMetadata.getDelayTime()));
+        } catch (Exception e) {
+            fail();

Review Comment:
   no need to call "fail()" here, simply add "throws Exception" into the test method signature and let any exception escape the method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org