You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/05/17 02:33:17 UTC

[GitHub] [pulsar] wolfstudy opened a new pull request, #15628: Fix delay messages are expired by TTL policy

wolfstudy opened a new pull request, #15628:
URL: https://github.com/apache/pulsar/pull/15628

   Signed-off-by: xiaolongran <xi...@tencent.com>
   
   
   ### Motivation
   
   The current TTL policy conflicts with delayed messages. The TTL policy does not identify how long to delay messages. When the time of the delayed message is greater than the time specified by TTL, we should give up checking the TTL of the current delayed message, because the time of the delayed message has not yet arrived, we cannot expired these messages.
   
   ### Modifications
   
   When the TTL is ready to expire the message, check whether the current entry contains the `deliverAtTime` field. If it contains a delayed message, then check whether the current delayed message time is greater than the ttl time. If it is greater, it means that we cannot expire the current entry at this time. .
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] wolfstudy commented on a diff in pull request #15628: Fix delay messages are expired by TTL policy

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on code in PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#discussion_r1003945282


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java:
##########
@@ -77,8 +77,10 @@ public boolean expireMessages(int messageTTLInSeconds) {
 
             cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
                 try {
-                    long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
-                    return MessageImpl.isEntryExpired(messageTTLInSeconds, entryTimestamp);
+                    // When the time of the delayed message is greater than the time specified by TTL, we should
+                    // give up checking the TTL of the current delayed message, because the time of the delayed
+                    // message has not yet arrived, we cannot delete these messages.
+                    return MessageImpl.isEntryExpired(messageTTLInSeconds, entry.getDataBuffer());

Review Comment:
   Yes @Jason918 , if the user uses a delayed message here and sets a TTL policy at the same time, the situation you described may exist. But compared to the situation where the TTL policy and delayed messages conflict with each other and cause message loss, I think this cost is acceptable. Or is there any other better way to avoid message loss in this scenario?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on pull request #15628: Fix delay messages are expired by TTL policy

Posted by "michaeljmarshall (via GitHub)" <gi...@apache.org>.
michaeljmarshall commented on PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#issuecomment-1610272141

   As discussed on the mailing list https://lists.apache.org/thread/w4jzk27qhtosgsz7l9bmhf1t7o9mxjhp, there is no plan to release 2.9.6, so I am going to remove the release/2.9.6 label


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] wolfstudy commented on a diff in pull request #15628: Fix delay messages are expired by TTL policy

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on code in PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#discussion_r1004328608


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java:
##########
@@ -77,8 +77,10 @@ public boolean expireMessages(int messageTTLInSeconds) {
 
             cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
                 try {
-                    long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
-                    return MessageImpl.isEntryExpired(messageTTLInSeconds, entryTimestamp);
+                    // When the time of the delayed message is greater than the time specified by TTL, we should
+                    // give up checking the TTL of the current delayed message, because the time of the delayed
+                    // message has not yet arrived, we cannot delete these messages.
+                    return MessageImpl.isEntryExpired(messageTTLInSeconds, entry.getDataBuffer());

Review Comment:
   I don't think so. Why do users need to perceive this matter? TTL and delayed messages are two completely independent things.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] wolfstudy commented on pull request #15628: Fix delay messages are expired by TTL policy

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#issuecomment-1282093681

   Sorry, I forgot to follow up on this change follow-up, now add a test case for this, PTAL again, @eolivelli @merlimat thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on a diff in pull request #15628: Fix delay messages are expired by TTL policy

Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#discussion_r1005349780


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java:
##########
@@ -77,8 +77,10 @@ public boolean expireMessages(int messageTTLInSeconds) {
 
             cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
                 try {
-                    long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
-                    return MessageImpl.isEntryExpired(messageTTLInSeconds, entryTimestamp);
+                    // When the time of the delayed message is greater than the time specified by TTL, we should
+                    // give up checking the TTL of the current delayed message, because the time of the delayed
+                    // message has not yet arrived, we cannot delete these messages.
+                    return MessageImpl.isEntryExpired(messageTTLInSeconds, entry.getDataBuffer());

Review Comment:
   I see we are solving this only for fix delay messages. But it seems we have no idea if the user are setting fix delay or just random delay. And for random delay, this won't solve anything (it's binary search, we are not going through all the messages), the TTL and delay time still conflicts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #15628: Fix delay messages are expired by TTL policy

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#issuecomment-1356625406

   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #15628: Fix delay messages are expired by TTL policy

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#issuecomment-1128337057

   @wolfstudy:Thanks for your contribution. For this PR, do we need to update docs?
   (The [PR template contains info about doc](https://github.com/apache/pulsar/blob/master/.github/PULL_REQUEST_TEMPLATE.md#documentation), which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] wolfstudy commented on a diff in pull request #15628: Fix delay messages are expired by TTL policy

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on code in PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#discussion_r1004328608


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java:
##########
@@ -77,8 +77,10 @@ public boolean expireMessages(int messageTTLInSeconds) {
 
             cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
                 try {
-                    long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
-                    return MessageImpl.isEntryExpired(messageTTLInSeconds, entryTimestamp);
+                    // When the time of the delayed message is greater than the time specified by TTL, we should
+                    // give up checking the TTL of the current delayed message, because the time of the delayed
+                    // message has not yet arrived, we cannot delete these messages.
+                    return MessageImpl.isEntryExpired(messageTTLInSeconds, entry.getDataBuffer());

Review Comment:
   I don't think so. Why do users need to perceive this matter? TTL and delayed messages are two completely independent things.
   
   The scope of TTL is an entire namespace (at least before version 2.9.0, Topic Policies was not very stable). When the TTL policy is set on the namespace side, do we require users to delay messages longer than TTL? time? I feel this is unacceptable for users.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #15628: Fix delay messages are expired by TTL policy

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#issuecomment-1226692138

   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on pull request #15628: Fix delay messages are expired by TTL policy

Posted by GitBox <gi...@apache.org>.
eolivelli commented on PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#issuecomment-1291567307

   /pulsar-bot rereun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] merlimat commented on a diff in pull request #15628: Fix delay messages are expired by TTL policy

Posted by GitBox <gi...@apache.org>.
merlimat commented on code in PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#discussion_r874483007


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java:
##########
@@ -75,6 +75,18 @@ public boolean expireMessages(int messageTTLInSeconds) {
 
             cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
                 try {
+                    // When the time of the delayed message is greater than the time specified by TTL, we should
+                    // give up checking the TTL of the current delayed message, because the time of the delayed
+                    // message has not yet arrived, we cannot delete these messages.
+                    long delayTime = MessageImpl.getDelayTime(entry.getDataBuffer());

Review Comment:
   This is going to triggering to parse the message metadata twice for each message. The check should be included into `MessageImpl.isEntryExpired()`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] 315157973 commented on a diff in pull request #15628: Fix delay messages are expired by TTL policy

Posted by GitBox <gi...@apache.org>.
315157973 commented on code in PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#discussion_r1007714935


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java:
##########
@@ -77,8 +77,10 @@ public boolean expireMessages(int messageTTLInSeconds) {
 
             cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
                 try {
-                    long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
-                    return MessageImpl.isEntryExpired(messageTTLInSeconds, entryTimestamp);
+                    // When the time of the delayed message is greater than the time specified by TTL, we should
+                    // give up checking the TTL of the current delayed message, because the time of the delayed
+                    // message has not yet arrived, we cannot delete these messages.
+                    return MessageImpl.isEntryExpired(messageTTLInSeconds, entry.getDataBuffer());

Review Comment:
   I feel that this is an expected behavior, it's better not to set TTL for topics that use delayed messages



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] wolfstudy commented on pull request #15628: Fix delay messages are expired by TTL policy

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#issuecomment-1286424331

   ping @eolivelli PTAL again, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] wolfstudy commented on a diff in pull request #15628: Fix delay messages are expired by TTL policy

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on code in PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#discussion_r889792717


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java:
##########
@@ -75,6 +75,18 @@ public boolean expireMessages(int messageTTLInSeconds) {
 
             cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
                 try {
+                    // When the time of the delayed message is greater than the time specified by TTL, we should
+                    // give up checking the TTL of the current delayed message, because the time of the delayed
+                    // message has not yet arrived, we cannot delete these messages.
+                    long delayTime = MessageImpl.getDelayTime(entry.getDataBuffer());

Review Comment:
   Hello @merlimat 
   
   In here, I define a pojo object of `EntryMetadata` that returns both `publishTime` and `delayTime` fields in one getEntry. Avoid multiple getEntry causing the following errors:
   
   ```
   8:22:40.475 [BookKeeperClientWorker-OrderedExecutor-11-0] ERROR org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor - [persistent://cmq-251199518/CMQ_QUEUE-perf_queue_cap-8/perf_queue_cap-8-partition-0][251199518_perf_queue_cap-8_consumer] Error deserializing message for expiry check
   
   java.lang.IllegalStateException: Some required fields are missing
   
           at org.apache.pulsar.common.api.proto.MessageMetadata.checkRequiredFields(MessageMetadata.java:1378) ~[org.apache.pulsar-pulsar-common-2.9.2.jar:2.9.2]
   
           at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1373) ~[org.apache.pulsar-pulsar-common-2.9.2.jar:2.9.2]
   
           at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:435) ~[org.apache.pulsar-pulsar-common-2.9.2.jar:2.9.2]
   
           at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:422) ~[org.apache.pulsar-pulsar-common-2.9.2.jar:2.9.2]
   
           at org.apache.pulsar.client.impl.MessageImpl.getDelayTime(MessageImpl.java:286) ~[org.apache.pulsar-pulsar-client-original-2.9.2.jar:2.9.2]
   
           at org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor.lambda$expireMessages$0(PersistentMessageExpiryMonitor.java:82) ~[org.apache.pulsar-pulsar-broker-2.9.2.jar:2.9.2]
   
           at org.apache.bookkeeper.mledger.impl.OpFindNewest.readEntryComplete(OpFindNewest.java:89) ~[org.apache.pulsar-managed-ledger-2.9.2.jar:2.9.2]
   
           at org.apache.bookkeeper.mledger.impl.EntryCacheImpl.lambda$asyncReadEntry0$0(EntryCacheImpl.java:222) ~[org.apache.pulsar-managed-ledger-2.9.2.jar:2.9.2]
   
           at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) [?:1.8.0_322]
   
           at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) [?:1.8.0_322]
   
           at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) [?:1.8.0_322]
   
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_322]
   
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_322]
   
           at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.76.Final.jar:4.1.76.Final]
   
           at java.lang.Thread.run(Thread.java:750) [?:1.8.0_322]
   
   
   ```
   
   PTAL again, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] wolfstudy commented on a diff in pull request #15628: Fix delay messages are expired by TTL policy

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on code in PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#discussion_r998999028


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java:
##########
@@ -77,8 +77,13 @@ public boolean expireMessages(int messageTTLInSeconds) {
 
             cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
                 try {
-                    long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
-                    return MessageImpl.isEntryExpired(messageTTLInSeconds, entryTimestamp);
+                    MessageImpl.EntryMetadata entryMetadata = MessageImpl.getEntryMetadata(entry.getDataBuffer());

Review Comment:
   Good idea, will fix this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on a diff in pull request #15628: Fix delay messages are expired by TTL policy

Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#discussion_r1003931565


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java:
##########
@@ -77,8 +77,10 @@ public boolean expireMessages(int messageTTLInSeconds) {
 
             cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
                 try {
-                    long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
-                    return MessageImpl.isEntryExpired(messageTTLInSeconds, entryTimestamp);
+                    // When the time of the delayed message is greater than the time specified by TTL, we should
+                    // give up checking the TTL of the current delayed message, because the time of the delayed
+                    // message has not yet arrived, we cannot delete these messages.
+                    return MessageImpl.isEntryExpired(messageTTLInSeconds, entry.getDataBuffer());

Review Comment:
   It's a binary search process. If we return a result depending on a non monotone increasing field(delayTime), the search result normally will go wild. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on a diff in pull request #15628: Fix delay messages are expired by TTL policy

Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#discussion_r1004227729


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java:
##########
@@ -77,8 +77,10 @@ public boolean expireMessages(int messageTTLInSeconds) {
 
             cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
                 try {
-                    long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
-                    return MessageImpl.isEntryExpired(messageTTLInSeconds, entryTimestamp);
+                    // When the time of the delayed message is greater than the time specified by TTL, we should
+                    // give up checking the TTL of the current delayed message, because the time of the delayed
+                    // message has not yet arrived, we cannot delete these messages.
+                    return MessageImpl.isEntryExpired(messageTTLInSeconds, entry.getDataBuffer());

Review Comment:
   I am not sure why we need to take care of this case. 
   It's also kind of message lost for non-delay message. 
   Maybe we should just make it clear for users that the delay time (`deliverAtTime - publishTime`) must be smaller than message TTL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #15628: Fix delay messages are expired by TTL policy

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#discussion_r998987315


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java:
##########
@@ -77,8 +77,13 @@ public boolean expireMessages(int messageTTLInSeconds) {
 
             cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
                 try {
-                    long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
-                    return MessageImpl.isEntryExpired(messageTTLInSeconds, entryTimestamp);
+                    MessageImpl.EntryMetadata entryMetadata = MessageImpl.getEntryMetadata(entry.getDataBuffer());

Review Comment:
   I think that we can save resources and do not allocate this object.
   we can move this logic into a  the new method `isEntryExpired(messageTTLInSeconds, entryTimestamp, delayTime)`
   



##########
pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java:
##########
@@ -484,6 +484,40 @@ public void testMessageBrokerAndEntryMetadataTimestampMissed() {
         }
     }
 
+    @Test(timeOut = 30000)
+    public void testDelayMessageConflictWithTtlPolicy() {
+        String data = "test-message";
+        ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length());
+        byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8));
+
+        long curTime = System.currentTimeMillis();
+        try {
+            MessageMetadata messageMetadata = new MessageMetadata()
+                    .setSequenceId(1)
+                    .setProducerName("testProducer")
+                    .setPartitionKeyB64Encoded(false)
+                    .setPublishTime(1)
+                    .setDeliverAtTime(curTime + 50000);
+            byteBuf = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, byteBuf);
+
+            MessageImpl.EntryMetadata entryMetadata = MessageImpl.getEntryMetadata(byteBuf);
+            assertEquals(entryMetadata.getDelayTime(), curTime + 50000); // delay 50s
+            assertEquals(entryMetadata.getPublishTime(), 1);
+
+            // when we set the delay message time to 50s, then when the TTL policy is 60s, it is greater than the
+            // time set for the delayed message, so the delayed message can be expired.
+            assertTrue(MessageImpl.isEntryExpired(60,
+                    entryMetadata.getPublishTime(), entryMetadata.getDelayTime()));
+
+            // when we set the delay message time to 50s, then when the TTL policy is 40s, it is less than the
+            // time set for the delayed message, so the delayed message cannot be expired.
+            assertFalse(MessageImpl.isEntryExpired(40,
+                    entryMetadata.getPublishTime(), entryMetadata.getDelayTime()));
+        } catch (Exception e) {
+            fail();

Review Comment:
   no need to call "fail()" here, simply add "throws Exception" into the test method signature and let any exception escape the method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #15628: Fix delay messages are expired by TTL policy

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#issuecomment-1283378783

   @wolfstudy Please add the following content to your PR description and select a checkbox:
   ```
   - [ ] `doc` <!-- Your PR contains doc changes -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
   - [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] wolfstudy commented on a diff in pull request #15628: Fix delay messages are expired by TTL policy

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on code in PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#discussion_r874937182


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java:
##########
@@ -75,6 +75,18 @@ public boolean expireMessages(int messageTTLInSeconds) {
 
             cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
                 try {
+                    // When the time of the delayed message is greater than the time specified by TTL, we should
+                    // give up checking the TTL of the current delayed message, because the time of the delayed
+                    // message has not yet arrived, we cannot delete these messages.
+                    long delayTime = MessageImpl.getDelayTime(entry.getDataBuffer());

Review Comment:
   Yes, it is required here because the delay time can only be obtained from the entry. Similar to getting the property of publish time, we can only parse the current entry to get the corresponding delay time to diff whether it needs to expire. `MessageImpl.isEntryExpired` just passes the publish time parsed from the entry into the method of `MessageImpl.isEntryExpired`, and there is no way to reduce the parsing of the entry



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on pull request #15628: Fix delay messages are expired by TTL policy

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#issuecomment-1150868597

   Another question about this PR:
   
   For pulsar, does the delayed message have a higher priority than TTL?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on a diff in pull request #15628: Fix delay messages are expired by TTL policy

Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#discussion_r1005195914


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java:
##########
@@ -77,8 +77,10 @@ public boolean expireMessages(int messageTTLInSeconds) {
 
             cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
                 try {
-                    long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
-                    return MessageImpl.isEntryExpired(messageTTLInSeconds, entryTimestamp);
+                    // When the time of the delayed message is greater than the time specified by TTL, we should
+                    // give up checking the TTL of the current delayed message, because the time of the delayed
+                    // message has not yet arrived, we cannot delete these messages.
+                    return MessageImpl.isEntryExpired(messageTTLInSeconds, entry.getDataBuffer());

Review Comment:
   TTL is not set by your user? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on pull request #15628: Fix delay messages are expired by TTL policy

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#issuecomment-1150862948

   Small Question:
   
   Why don't we cherry-pick this PR to branch 2.10?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #15628: Fix delay messages are expired by TTL policy

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#issuecomment-1128337370

   @wolfstudy:Thanks for providing doc info!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] wolfstudy commented on pull request #15628: Fix delay messages are expired by TTL policy

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#issuecomment-1283521641

   > very good
   > 
   > I left one suggestion in order to save resources and GC time
   
   Hello Enrico, PTAL again thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on a diff in pull request #15628: Fix delay messages are expired by TTL policy

Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#discussion_r1004227729


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java:
##########
@@ -77,8 +77,10 @@ public boolean expireMessages(int messageTTLInSeconds) {
 
             cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
                 try {
-                    long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
-                    return MessageImpl.isEntryExpired(messageTTLInSeconds, entryTimestamp);
+                    // When the time of the delayed message is greater than the time specified by TTL, we should
+                    // give up checking the TTL of the current delayed message, because the time of the delayed
+                    // message has not yet arrived, we cannot delete these messages.
+                    return MessageImpl.isEntryExpired(messageTTLInSeconds, entry.getDataBuffer());

Review Comment:
   I am not sure why we need to take care of this case. 
   It's also kind of message lost for non-delay message if it's expired by TTL.
   Maybe we should just make it clear for users that the delay time (`deliverAtTime - publishTime`) must be smaller than message TTL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on pull request #15628: Fix delay messages are expired by TTL policy

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#issuecomment-1318540301

   @wolfstudy   hi, I move this PR to release/2.9.5, if you have any questions, please ping me. thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #15628: Fix delay messages are expired by TTL policy

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#issuecomment-1501575654

   Since we will start the RC version of `3.0.0` on `2023-04-11`, I will change the label/milestone of PR who have not been merged.
   - The PR of type `feature` is deferred to `3.1.0`
   - The PR of type `fix` is deferred to `3.0.1`
   
   So drag this PR to `3.0.1`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] wolfstudy commented on a diff in pull request #15628: Fix delay messages are expired by TTL policy

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on code in PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#discussion_r874937182


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java:
##########
@@ -75,6 +75,18 @@ public boolean expireMessages(int messageTTLInSeconds) {
 
             cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
                 try {
+                    // When the time of the delayed message is greater than the time specified by TTL, we should
+                    // give up checking the TTL of the current delayed message, because the time of the delayed
+                    // message has not yet arrived, we cannot delete these messages.
+                    long delayTime = MessageImpl.getDelayTime(entry.getDataBuffer());

Review Comment:
   Good ideas, will fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] wolfstudy commented on a diff in pull request #15628: Fix delay messages are expired by TTL policy

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on code in PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#discussion_r889792717


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java:
##########
@@ -75,6 +75,18 @@ public boolean expireMessages(int messageTTLInSeconds) {
 
             cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
                 try {
+                    // When the time of the delayed message is greater than the time specified by TTL, we should
+                    // give up checking the TTL of the current delayed message, because the time of the delayed
+                    // message has not yet arrived, we cannot delete these messages.
+                    long delayTime = MessageImpl.getDelayTime(entry.getDataBuffer());

Review Comment:
   Hello @merlimat 
   
   In here, I define a pojo object of `EntryMetadata` that returns both `publishTime` and `delayTime` fields in one getEntry. Avoid multiple getEntry causing the following errors:
   
   ```
   8:22:40.475 [BookKeeperClientWorker-OrderedExecutor-11-0] ERROR org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor - [persistent://cmq-251199518/CMQ_QUEUE-perf_queue_cap-8/perf_queue_cap-8-partition-0][251199518_perf_queue_cap-8_consumer] Error deserializing message for expiry check
   
   java.lang.IllegalStateException: Some required fields are missing
   
           at org.apache.pulsar.common.api.proto.MessageMetadata.checkRequiredFields(MessageMetadata.java:1378) ~[org.apache.pulsar-pulsar-common-2.9.2.jar:2.9.2]
   
           at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1373) ~[org.apache.pulsar-pulsar-common-2.9.2.jar:2.9.2]
   
           at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:435) ~[org.apache.pulsar-pulsar-common-2.9.2.jar:2.9.2]
   
           at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:422) ~[org.apache.pulsar-pulsar-common-2.9.2.jar:2.9.2]
   
           at org.apache.pulsar.client.impl.MessageImpl.getDelayTime(MessageImpl.java:286) ~[org.apache.pulsar-pulsar-client-original-2.9.2.jar:2.9.2]
   
           at org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor.lambda$expireMessages$0(PersistentMessageExpiryMonitor.java:82) ~[org.apache.pulsar-pulsar-broker-2.9.2.jar:2.9.2]
   
           at org.apache.bookkeeper.mledger.impl.OpFindNewest.readEntryComplete(OpFindNewest.java:89) ~[org.apache.pulsar-managed-ledger-2.9.2.jar:2.9.2]
   
           at org.apache.bookkeeper.mledger.impl.EntryCacheImpl.lambda$asyncReadEntry0$0(EntryCacheImpl.java:222) ~[org.apache.pulsar-managed-ledger-2.9.2.jar:2.9.2]
   
           at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) [?:1.8.0_322]
   
           at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) [?:1.8.0_322]
   
           at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) [?:1.8.0_322]
   
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_322]
   
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_322]
   
           at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.76.Final.jar:4.1.76.Final]
   
           at java.lang.Thread.run(Thread.java:750) [?:1.8.0_322]
   
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] wolfstudy commented on pull request #15628: Fix delay messages are expired by TTL policy

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#issuecomment-1288407044

   ping @eolivelli PTAL again, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #15628: Fix delay messages are expired by TTL policy

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#discussion_r1007805237


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java:
##########
@@ -77,8 +77,10 @@ public boolean expireMessages(int messageTTLInSeconds) {
 
             cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
                 try {
-                    long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
-                    return MessageImpl.isEntryExpired(messageTTLInSeconds, entryTimestamp);
+                    // When the time of the delayed message is greater than the time specified by TTL, we should
+                    // give up checking the TTL of the current delayed message, because the time of the delayed
+                    // message has not yet arrived, we cannot delete these messages.
+                    return MessageImpl.isEntryExpired(messageTTLInSeconds, entry.getDataBuffer());

Review Comment:
   Looks like the delayed message tracker can try to tell the managed ledger minimum position that still has not expired. So the cursor can only skip the entries before that position. Just a rough idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #15628: Fix delay messages are expired by TTL policy

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#discussion_r1161560793


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java:
##########
@@ -77,8 +77,10 @@ public boolean expireMessages(int messageTTLInSeconds) {
 
             cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
                 try {
-                    long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
-                    return MessageImpl.isEntryExpired(messageTTLInSeconds, entryTimestamp);
+                    // When the time of the delayed message is greater than the time specified by TTL, we should
+                    // give up checking the TTL of the current delayed message, because the time of the delayed
+                    // message has not yet arrived, we cannot delete these messages.
+                    return MessageImpl.isEntryExpired(messageTTLInSeconds, entry.getDataBuffer());

Review Comment:
   > I feel that this is an expected behavior, it's better not to set TTL for topics that use delayed messages
   
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org