You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "coderzc (via GitHub)" <gi...@apache.org> on 2023/08/11 16:36:14 UTC

[GitHub] [pulsar] coderzc opened a new pull request, #20980: [fix][broker] Fix message loss during topic compaction

coderzc opened a new pull request, #20980:
URL: https://github.com/apache/pulsar/pull/20980

   ### Motivation
   
   If a batch contains a message with a null value (i.e. payloadSize == 0) and `PhaseOneResult.from` haven't been set, then the compactor may skip the whole batch message according to `PhaseOneResult.from` in phase two, which will lead to message loss during the topic compact.
   
   Also, `PhaseOneResult.to` has the same problem, when finding the value of the message is null, `PhaseOneResult.to` will not be updated leading to `PhaseOneResult.to` may be less than `lastReadId`. I think if the value of the key is null, then prove the key should be deleted, so we should immediately compact them instead of ignoring them for now. 
   
   https://github.com/apache/pulsar/blob/98628845aa94b416b0c3711b5a15819ade549c57/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java#L126-L166
   
   ### Modifications
   
   * Make `PhaseOneResult.from` always set to the first message read from the RawReader in phase one, since nobody initially sends a message with a null value.
   
   * Remove `PhaseOneResult.to` and use `lastReadId` to replace `PhaseOneResult.to` in phase two.
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   
   ### Matching PR in forked repository
   
   PR in forked repository: <!-- ENTER URL HERE -->
   
   <!--
   After opening this PR, the build in apache/pulsar will fail and instructions will
   be provided for opening a PR in the PR author's forked repository.
   
   apache/pulsar pull requests should be first tested in your own fork since the 
   apache/pulsar CI based on GitHub Actions has constrained resources and quota.
   GitHub Actions provides separate quota for pull requests that are executed in 
   a forked repository.
   
   The tests will be run in the forked repository until all PR review comments have
   been handled, the tests pass and the PR is approved by a reviewer.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc closed pull request #20980: [fix][broker] Fix message loss during topic compaction

Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc closed pull request #20980: [fix][broker] Fix message loss during topic compaction
URL: https://github.com/apache/pulsar/pull/20980


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #20980: [fix][broker] Fix message loss during topic compaction

Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on code in PR #20980:
URL: https://github.com/apache/pulsar/pull/20980#discussion_r1293252538


##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java:
##########
@@ -271,7 +266,7 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
                                         promise.completeExceptionally(exception2);
                                     }
                                 });
-                        if (to.equals(id)) {
+                        if (lastReadId.equals(id)) {

Review Comment:
   Another PR will fix it, please see https://github.com/apache/pulsar/pull/20988



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codecov-commenter commented on pull request #20980: [fix][broker] Fix message loss during topic compaction

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #20980:
URL: https://github.com/apache/pulsar/pull/20980#issuecomment-1675853798

   ## [Codecov](https://app.codecov.io/gh/apache/pulsar/pull/20980?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   > Merging [#20980](https://app.codecov.io/gh/apache/pulsar/pull/20980?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (b55e6b9) into [master](https://app.codecov.io/gh/apache/pulsar/commit/98628845aa94b416b0c3711b5a15819ade549c57?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (9862884) will **increase** coverage by `39.58%`.
   > Report is 3 commits behind head on master.
   > The diff coverage is `100.00%`.
   
   [![Impacted file tree graph](https://app.codecov.io/gh/apache/pulsar/pull/20980/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/20980?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #20980       +/-   ##
   =============================================
   + Coverage     33.53%   73.11%   +39.58%     
   - Complexity    12174    32244    +20070     
   =============================================
     Files          1621     1875      +254     
     Lines        126919   139416    +12497     
     Branches      13851    15328     +1477     
   =============================================
   + Hits          42561   101938    +59377     
   + Misses        78745    29417    -49328     
   - Partials       5613     8061     +2448     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | inttests | `24.23% <18.75%> (-0.04%)` | :arrow_down: |
   | systests | `25.20% <18.75%> (?)` | |
   | unittests | `72.39% <100.00%> (+40.33%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Files Changed](https://app.codecov.io/gh/apache/pulsar/pull/20980?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [...g/apache/pulsar/client/impl/RawBatchConverter.java](https://app.codecov.io/gh/apache/pulsar/pull/20980?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL1Jhd0JhdGNoQ29udmVydGVyLmphdmE=) | `93.50% <100.00%> (+19.82%)` | :arrow_up: |
   | [...rg/apache/pulsar/compaction/TwoPhaseCompactor.java](https://app.codecov.io/gh/apache/pulsar/pull/20980?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NvbXBhY3Rpb24vVHdvUGhhc2VDb21wYWN0b3IuamF2YQ==) | `75.22% <100.00%> (+5.03%)` | :arrow_up: |
   
   ... and [1520 files with indirect coverage changes](https://app.codecov.io/gh/apache/pulsar/pull/20980/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #20980: [fix][broker] Fix message loss during topic compaction

Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on code in PR #20980:
URL: https://github.com/apache/pulsar/pull/20980#discussion_r1291592568


##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java:
##########
@@ -271,7 +266,7 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
                                         promise.completeExceptionally(exception2);
                                     }
                                 });
-                        if (to.equals(id)) {
+                        if (lastReadId.equals(id)) {

Review Comment:
   There seems to be a return missing in line-275, may need to open another PR to fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- merged pull request #20980: [fix][broker] Fix message loss during topic compaction

Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- merged PR #20980:
URL: https://github.com/apache/pulsar/pull/20980


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #20980: [fix][broker] Fix message loss during topic compaction

Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on code in PR #20980:
URL: https://github.com/apache/pulsar/pull/20980#discussion_r1291592568


##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java:
##########
@@ -271,7 +266,7 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
                                         promise.completeExceptionally(exception2);
                                     }
                                 });
-                        if (to.equals(id)) {
+                        if (lastReadId.equals(id)) {

Review Comment:
   It seems to be a 'return' missing in line-275, we may need to open another PR to fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #20980: [fix][broker] Fix message loss during topic compaction

Posted by "codelipenghui (via GitHub)" <gi...@apache.org>.
codelipenghui commented on code in PR #20980:
URL: https://github.com/apache/pulsar/pull/20980#discussion_r1293045876


##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java:
##########
@@ -122,27 +122,36 @@ private void phaseOneLoop(RawReader reader,
                 () -> FutureUtil.createTimeoutException("Timeout", getClass(), "phaseOneLoop(...)"));
 
         future.thenAcceptAsync(m -> {
-            try {
+            try (m) {
                 MessageId id = m.getMessageId();
                 boolean deletedMessage = false;
                 boolean replaceMessage = false;
                 mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
-                if (RawBatchConverter.isReadableBatch(m)) {
+                MessageMetadata metadata = Commands.parseMessageMetadata(m.getHeadersAndPayload());
+                if (RawBatchConverter.isReadableBatch(metadata)) {
                     try {
+                        int numMessagesInBatch = metadata.getNumMessagesInBatch();
+                        int deleteCnt = 0;
                         for (ImmutableTriple<MessageId, String, Integer> e : extractIdsAndKeysAndSizeFromBatch(m)) {
+                             boolean singleDeletedMessage = false;
+                             boolean singleReplaceMessage = false;

Review Comment:
   It looks like we don't need add above 2 variables to make the code more easily to read
   
   ```java
   for (ImmutableTriple<MessageId, String, Integer> e : extractIdsAndKeysAndSizeFromBatch(m)) {
       if (e != null) {
           if (e.getRight() > 0) {
               MessageId old = latestForKey.put(e.getMiddle(), e.getLeft());
               if (old != null) {
                   mxBean.addCompactionRemovedEvent(reader.getTopic());
               }
           } else {
               latestForKey.remove(e.getMiddle());
               deleteCnt++;
               mxBean.addCompactionRemovedEvent(reader.getTopic());
           }
       }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #20980: [fix][broker] Fix message loss during topic compaction

Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on code in PR #20980:
URL: https://github.com/apache/pulsar/pull/20980#discussion_r1293237133


##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java:
##########
@@ -122,27 +122,36 @@ private void phaseOneLoop(RawReader reader,
                 () -> FutureUtil.createTimeoutException("Timeout", getClass(), "phaseOneLoop(...)"));
 
         future.thenAcceptAsync(m -> {
-            try {
+            try (m) {
                 MessageId id = m.getMessageId();
                 boolean deletedMessage = false;
                 boolean replaceMessage = false;
                 mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
-                if (RawBatchConverter.isReadableBatch(m)) {
+                MessageMetadata metadata = Commands.parseMessageMetadata(m.getHeadersAndPayload());
+                if (RawBatchConverter.isReadableBatch(metadata)) {
                     try {
+                        int numMessagesInBatch = metadata.getNumMessagesInBatch();
+                        int deleteCnt = 0;
                         for (ImmutableTriple<MessageId, String, Integer> e : extractIdsAndKeysAndSizeFromBatch(m)) {
+                             boolean singleDeletedMessage = false;
+                             boolean singleReplaceMessage = false;

Review Comment:
   Good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaoran10 commented on a diff in pull request #20980: [fix][broker] Fix message loss during topic compaction

Posted by "gaoran10 (via GitHub)" <gi...@apache.org>.
gaoran10 commented on code in PR #20980:
URL: https://github.com/apache/pulsar/pull/20980#discussion_r1292074537


##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java:
##########
@@ -271,7 +266,7 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
                                         promise.completeExceptionally(exception2);
                                     }
                                 });
-                        if (to.equals(id)) {
+                        if (lastReadId.equals(id)) {

Review Comment:
   Yes, if we don't stop phase two, the next entry may be dropped because it does not exist in the `latestForKey`.



##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java:
##########
@@ -162,46 +161,42 @@ private void phaseOneLoop(RawReader reader,
                         mxBean.addCompactionRemovedEvent(reader.getTopic());
                     }
                 }
-                MessageId first = firstMessageId.orElse(deletedMessage ? null : id);
-                MessageId to = deletedMessage ? toMessageId.orElse(null) : id;
+                MessageId first = firstMessageId.orElse(id);
                 if (id.compareTo(lastMessageId) == 0) {
-                    loopPromise.complete(new PhaseOneResult(first == null ? id : first, to == null ? id : to,
-                            lastMessageId, latestForKey));
+                    loopPromise.complete(new PhaseOneResult(first, lastMessageId, latestForKey));
                 } else {
                     phaseOneLoop(reader,
-                            Optional.ofNullable(first),
-                            Optional.ofNullable(to),
+                            Optional.of(first),
                             lastMessageId,
                             latestForKey, loopPromise);
                 }
-            } finally {
-                m.close();
             }
         }, scheduler).exceptionally(ex -> {
             loopPromise.completeExceptionally(ex);
             return null;
         });
     }
 
-    private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, MessageId to, MessageId lastReadId,
-            Map<String, MessageId> latestForKey, BookKeeper bk) {
+    private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, MessageId lastReadId,

Review Comment:
   Do we need to remove the param `to`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #20980: [fix][broker] Fix message loss during topic compaction

Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- commented on code in PR #20980:
URL: https://github.com/apache/pulsar/pull/20980#discussion_r1293129942


##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java:
##########
@@ -122,27 +122,36 @@ private void phaseOneLoop(RawReader reader,
                 () -> FutureUtil.createTimeoutException("Timeout", getClass(), "phaseOneLoop(...)"));
 
         future.thenAcceptAsync(m -> {
-            try {
+            try (m) {
                 MessageId id = m.getMessageId();
                 boolean deletedMessage = false;
                 boolean replaceMessage = false;
                 mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
-                if (RawBatchConverter.isReadableBatch(m)) {
+                MessageMetadata metadata = Commands.parseMessageMetadata(m.getHeadersAndPayload());
+                if (RawBatchConverter.isReadableBatch(metadata)) {
                     try {
+                        int numMessagesInBatch = metadata.getNumMessagesInBatch();
+                        int deleteCnt = 0;
                         for (ImmutableTriple<MessageId, String, Integer> e : extractIdsAndKeysAndSizeFromBatch(m)) {
+                             boolean singleDeletedMessage = false;
+                             boolean singleReplaceMessage = false;

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #20980: [fix][broker] Fix message loss during topic compaction

Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on code in PR #20980:
URL: https://github.com/apache/pulsar/pull/20980#discussion_r1291592568


##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java:
##########
@@ -271,7 +266,7 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
                                         promise.completeExceptionally(exception2);
                                     }
                                 });
-                        if (to.equals(id)) {
+                        if (lastReadId.equals(id)) {

Review Comment:
   It seems to be a 'return' missing in line-287, we may need to open another PR to fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #20980: [fix][broker] Fix message loss during topic compaction

Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on code in PR #20980:
URL: https://github.com/apache/pulsar/pull/20980#discussion_r1291592568


##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java:
##########
@@ -271,7 +266,7 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
                                         promise.completeExceptionally(exception2);
                                     }
                                 });
-                        if (to.equals(id)) {
+                        if (lastReadId.equals(id)) {

Review Comment:
   It seems to be a return missing in line-275, we may need to open another PR to fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org