You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/12/28 09:11:55 UTC

[GitHub] [pulsar] codelipenghui opened a new pull request #13533: Return message ID from compacted ledger while the compaction cursor reach the end of the topic

codelipenghui opened a new pull request #13533:
URL: https://github.com/apache/pulsar/pull/13533


   ### Motivation
   
   The problem happens when the compaction cursor reaches the end of the topic but the tail messages
   of the topic have been removed by producer writes null value messages during the topic compaction.
   
   For example:
   
   - 5 messages in the original topic with key: 0,1,2,3,4
   - the corresponding message IDs are: 1:0, 1:1, 1:2, 1:3, 1:4
   - producer send null value messages for key 3 and 4
   - trigger the topic compaction task
   
   After the compaction task complete,
   
   - 5 messages in the original topic: 1:0, 1:1, 1:2, 1:3, 1:4
   - 3 messages in the compacted ledger: 1:0, 1:1, 1:2
   
   At this moment, if the reader tries to get the last message ID of the topic,
   we should return `1:2` not `1:4`, because the reader is not able to read the message
   with keys `3` and `4` from the compacted topic, otherwise, the `reader.readNext()` method
   will be blocked until a new message written to the topic.
   
   ### Modifications
   
   The fix is straightforward, when the broker receives a get last message ID request,
   the broker will check if the compaction cursor reaches the end of the original topic.
   If yes, respond last message ID from the compacted ledger.
   
   ### Verifying this change
   
   New test added `testHasMessageAvailableWithNullValueMessage` which ensure the `hasMessageAvailable()`
   return false no more messages from the compacted topic if the compaction cursor reaches the end of the topic.
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (no)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
     - The rest endpoints: (no)
     - The admin cli options: (no)
     - Anything that affects deployment: (no)
   
   ### Documentation
   
   Check the box below and label this PR (if you have committer privilege).
   
   Need to update docs? 
   
   - [ ] `doc-required` 
     
     (If you need help on updating docs, create a doc issue)
     
   - [x] `no-need-doc` 
     
     (Please explain why)
     
   - [ ] `doc` 
     
     (If this PR contains doc changes)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #13533: Return message ID from compacted ledger while the compaction cursor reach the end of the topic

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #13533:
URL: https://github.com/apache/pulsar/pull/13533#discussion_r775825032



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1729,7 +1729,10 @@ private void getLargestBatchIndexWhenPossible(
         ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
 
         // If it's not pointing to a valid entry, respond messageId of the current position.
-        if (lastPosition.getEntryId() == -1) {
+        // If the compaction cursor reach the end of the topic, respond messageId from compacted ledger
+        Optional<Position> compactionHorizon = persistentTopic.getCompactedTopic().getCompactionHorizon();
+        if (lastPosition.getEntryId() == -1 || (compactionHorizon.isPresent()
+                        && lastPosition.compareTo((PositionImpl) compactionHorizon.get()) <= 0)) {

Review comment:
       Yes, the unit tests can cover the horizon == last message ID, theoretically, horizon should not > last message ID




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #13533: Return message ID from compacted ledger while the compaction cursor reach the end of the topic

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #13533:
URL: https://github.com/apache/pulsar/pull/13533


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #13533: Return message ID from compacted ledger while the compaction cursor reach the end of the topic

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #13533:
URL: https://github.com/apache/pulsar/pull/13533#discussion_r775823434



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1729,7 +1729,10 @@ private void getLargestBatchIndexWhenPossible(
         ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
 
         // If it's not pointing to a valid entry, respond messageId of the current position.
-        if (lastPosition.getEntryId() == -1) {
+        // If the compaction cursor reach the end of the topic, respond messageId from compacted ledger
+        Optional<Position> compactionHorizon = persistentTopic.getCompactedTopic().getCompactionHorizon();
+        if (lastPosition.getEntryId() == -1 || (compactionHorizon.isPresent()
+                        && lastPosition.compareTo((PositionImpl) compactionHorizon.get()) <= 0)) {

Review comment:
       - compaction horizon < last message -> the compaction cursor does not reach the end of the original topic, which means not all the data has been compacted, so we should return the last message ID from the original topic.
   - compaction horizon >= last message -> the compaction cursor reach the end of the original topic, which means all the data been has been compacted, so we should return the last message ID from the compacted ledger.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #13533: Return message ID from compacted ledger while the compaction cursor reach the end of the topic

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #13533:
URL: https://github.com/apache/pulsar/pull/13533#discussion_r775808367



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
##########
@@ -311,9 +310,8 @@ private static int comparePositionAndMessageId(PositionImpl p, MessageIdData m)
             .compare(p.getEntryId(), m.getEntryId()).result();
     }
 
-    @VisibleForTesting
-    PositionImpl getCompactionHorizon() {
-        return this.compactionHorizon;
+    public synchronized Optional<Position> getCompactionHorizon() {

Review comment:
       Why should we use `synchronized ` ?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1729,7 +1729,10 @@ private void getLargestBatchIndexWhenPossible(
         ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
 
         // If it's not pointing to a valid entry, respond messageId of the current position.
-        if (lastPosition.getEntryId() == -1) {
+        // If the compaction cursor reach the end of the topic, respond messageId from compacted ledger
+        Optional<Position> compactionHorizon = persistentTopic.getCompactedTopic().getCompactionHorizon();
+        if (lastPosition.getEntryId() == -1 || (compactionHorizon.isPresent()
+                        && lastPosition.compareTo((PositionImpl) compactionHorizon.get()) <= 0)) {

Review comment:
       Should it be `lastPosition.compareTo((PositionImpl) compactionHorizon.get()) >= 0` ?
   Does the unit test cover this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #13533: Return message ID from compacted ledger while the compaction cursor reach the end of the topic

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #13533:
URL: https://github.com/apache/pulsar/pull/13533#discussion_r775821748



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
##########
@@ -311,9 +310,8 @@ private static int comparePositionAndMessageId(PositionImpl p, MessageIdData m)
             .compare(p.getEntryId(), m.getEntryId()).result();
     }
 
-    @VisibleForTesting
-    PositionImpl getCompactionHorizon() {
-        return this.compactionHorizon;
+    public synchronized Optional<Position> getCompactionHorizon() {

Review comment:
       Another thread update the `compactionHorizon` https://github.com/apache/pulsar/blob/07ef9231db8b844586b9217ee2d59237eb9c54b7/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java#L70




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org