You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "coderzc (via GitHub)" <gi...@apache.org> on 2023/08/11 16:36:14 UTC
[GitHub] [pulsar] coderzc opened a new pull request, #20980: [fix][broker] Fix message loss during topic compaction
coderzc opened a new pull request, #20980:
URL: https://github.com/apache/pulsar/pull/20980
### Motivation
If a batch contains a message with a null value (i.e. payloadSize == 0) and `PhaseOneResult.from` haven't been set, then the compactor may skip the whole batch message according to `PhaseOneResult.from` in phase two, which will lead to message loss during the topic compact.
Also, `PhaseOneResult.to` has the same problem, when finding the value of the message is null, `PhaseOneResult.to` will not be updated leading to `PhaseOneResult.to` may be less than `lastReadId`. I think if the value of the key is null, then prove the key should be deleted, so we should immediately compact them instead of ignoring them for now.
https://github.com/apache/pulsar/blob/98628845aa94b416b0c3711b5a15819ade549c57/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java#L126-L166
### Modifications
* Make `PhaseOneResult.from` always set to the first message read from the RawReader in phase one, since nobody initially sends a message with a null value.
* Remove `PhaseOneResult.to` and use `lastReadId` to replace `PhaseOneResult.to` in phase two.
### Verifying this change
- [x] Make sure that the change passes the CI checks.
*(Please pick either of the following options)*
This change is a trivial rework / code cleanup without any test coverage.
*(or)*
This change is already covered by existing tests, such as *(please describe tests)*.
*(or)*
This change added tests and can be verified as follows:
*(example:)*
- *Added integration tests for end-to-end deployment with large payloads (10MB)*
- *Extended integration test for recovery after broker failure*
### Documentation
<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
- [ ] `doc` <!-- Your PR contains doc changes. -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->
### Matching PR in forked repository
PR in forked repository: <!-- ENTER URL HERE -->
<!--
After opening this PR, the build in apache/pulsar will fail and instructions will
be provided for opening a PR in the PR author's forked repository.
apache/pulsar pull requests should be first tested in your own fork since the
apache/pulsar CI based on GitHub Actions has constrained resources and quota.
GitHub Actions provides separate quota for pull requests that are executed in
a forked repository.
The tests will be run in the forked repository until all PR review comments have
been handled, the tests pass and the PR is approved by a reviewer.
-->
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] coderzc closed pull request #20980: [fix][broker] Fix message loss during topic compaction
Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc closed pull request #20980: [fix][broker] Fix message loss during topic compaction
URL: https://github.com/apache/pulsar/pull/20980
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] coderzc commented on a diff in pull request #20980: [fix][broker] Fix message loss during topic compaction
Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on code in PR #20980:
URL: https://github.com/apache/pulsar/pull/20980#discussion_r1293252538
##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java:
##########
@@ -271,7 +266,7 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
promise.completeExceptionally(exception2);
}
});
- if (to.equals(id)) {
+ if (lastReadId.equals(id)) {
Review Comment:
Another PR will fix it, please see https://github.com/apache/pulsar/pull/20988
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] codecov-commenter commented on pull request #20980: [fix][broker] Fix message loss during topic compaction
Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #20980:
URL: https://github.com/apache/pulsar/pull/20980#issuecomment-1675853798
## [Codecov](https://app.codecov.io/gh/apache/pulsar/pull/20980?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
> Merging [#20980](https://app.codecov.io/gh/apache/pulsar/pull/20980?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (b55e6b9) into [master](https://app.codecov.io/gh/apache/pulsar/commit/98628845aa94b416b0c3711b5a15819ade549c57?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (9862884) will **increase** coverage by `39.58%`.
> Report is 3 commits behind head on master.
> The diff coverage is `100.00%`.
[![Impacted file tree graph](https://app.codecov.io/gh/apache/pulsar/pull/20980/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/20980?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
```diff
@@ Coverage Diff @@
## master #20980 +/- ##
=============================================
+ Coverage 33.53% 73.11% +39.58%
- Complexity 12174 32244 +20070
=============================================
Files 1621 1875 +254
Lines 126919 139416 +12497
Branches 13851 15328 +1477
=============================================
+ Hits 42561 101938 +59377
+ Misses 78745 29417 -49328
- Partials 5613 8061 +2448
```
| Flag | Coverage Δ | |
|---|---|---|
| inttests | `24.23% <18.75%> (-0.04%)` | :arrow_down: |
| systests | `25.20% <18.75%> (?)` | |
| unittests | `72.39% <100.00%> (+40.33%)` | :arrow_up: |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Files Changed](https://app.codecov.io/gh/apache/pulsar/pull/20980?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
|---|---|---|
| [...g/apache/pulsar/client/impl/RawBatchConverter.java](https://app.codecov.io/gh/apache/pulsar/pull/20980?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL1Jhd0JhdGNoQ29udmVydGVyLmphdmE=) | `93.50% <100.00%> (+19.82%)` | :arrow_up: |
| [...rg/apache/pulsar/compaction/TwoPhaseCompactor.java](https://app.codecov.io/gh/apache/pulsar/pull/20980?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NvbXBhY3Rpb24vVHdvUGhhc2VDb21wYWN0b3IuamF2YQ==) | `75.22% <100.00%> (+5.03%)` | :arrow_up: |
... and [1520 files with indirect coverage changes](https://app.codecov.io/gh/apache/pulsar/pull/20980/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] coderzc commented on a diff in pull request #20980: [fix][broker] Fix message loss during topic compaction
Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on code in PR #20980:
URL: https://github.com/apache/pulsar/pull/20980#discussion_r1291592568
##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java:
##########
@@ -271,7 +266,7 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
promise.completeExceptionally(exception2);
}
});
- if (to.equals(id)) {
+ if (lastReadId.equals(id)) {
Review Comment:
There seems to be a return missing in line-275, may need to open another PR to fix it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] Technoboy- merged pull request #20980: [fix][broker] Fix message loss during topic compaction
Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- merged PR #20980:
URL: https://github.com/apache/pulsar/pull/20980
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] coderzc commented on a diff in pull request #20980: [fix][broker] Fix message loss during topic compaction
Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on code in PR #20980:
URL: https://github.com/apache/pulsar/pull/20980#discussion_r1291592568
##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java:
##########
@@ -271,7 +266,7 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
promise.completeExceptionally(exception2);
}
});
- if (to.equals(id)) {
+ if (lastReadId.equals(id)) {
Review Comment:
It seems to be a 'return' missing in line-275, we may need to open another PR to fix it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] codelipenghui commented on a diff in pull request #20980: [fix][broker] Fix message loss during topic compaction
Posted by "codelipenghui (via GitHub)" <gi...@apache.org>.
codelipenghui commented on code in PR #20980:
URL: https://github.com/apache/pulsar/pull/20980#discussion_r1293045876
##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java:
##########
@@ -122,27 +122,36 @@ private void phaseOneLoop(RawReader reader,
() -> FutureUtil.createTimeoutException("Timeout", getClass(), "phaseOneLoop(...)"));
future.thenAcceptAsync(m -> {
- try {
+ try (m) {
MessageId id = m.getMessageId();
boolean deletedMessage = false;
boolean replaceMessage = false;
mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
- if (RawBatchConverter.isReadableBatch(m)) {
+ MessageMetadata metadata = Commands.parseMessageMetadata(m.getHeadersAndPayload());
+ if (RawBatchConverter.isReadableBatch(metadata)) {
try {
+ int numMessagesInBatch = metadata.getNumMessagesInBatch();
+ int deleteCnt = 0;
for (ImmutableTriple<MessageId, String, Integer> e : extractIdsAndKeysAndSizeFromBatch(m)) {
+ boolean singleDeletedMessage = false;
+ boolean singleReplaceMessage = false;
Review Comment:
It looks like we don't need add above 2 variables to make the code more easily to read
```java
for (ImmutableTriple<MessageId, String, Integer> e : extractIdsAndKeysAndSizeFromBatch(m)) {
if (e != null) {
if (e.getRight() > 0) {
MessageId old = latestForKey.put(e.getMiddle(), e.getLeft());
if (old != null) {
mxBean.addCompactionRemovedEvent(reader.getTopic());
}
} else {
latestForKey.remove(e.getMiddle());
deleteCnt++;
mxBean.addCompactionRemovedEvent(reader.getTopic());
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] coderzc commented on a diff in pull request #20980: [fix][broker] Fix message loss during topic compaction
Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on code in PR #20980:
URL: https://github.com/apache/pulsar/pull/20980#discussion_r1293237133
##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java:
##########
@@ -122,27 +122,36 @@ private void phaseOneLoop(RawReader reader,
() -> FutureUtil.createTimeoutException("Timeout", getClass(), "phaseOneLoop(...)"));
future.thenAcceptAsync(m -> {
- try {
+ try (m) {
MessageId id = m.getMessageId();
boolean deletedMessage = false;
boolean replaceMessage = false;
mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
- if (RawBatchConverter.isReadableBatch(m)) {
+ MessageMetadata metadata = Commands.parseMessageMetadata(m.getHeadersAndPayload());
+ if (RawBatchConverter.isReadableBatch(metadata)) {
try {
+ int numMessagesInBatch = metadata.getNumMessagesInBatch();
+ int deleteCnt = 0;
for (ImmutableTriple<MessageId, String, Integer> e : extractIdsAndKeysAndSizeFromBatch(m)) {
+ boolean singleDeletedMessage = false;
+ boolean singleReplaceMessage = false;
Review Comment:
Good idea.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] gaoran10 commented on a diff in pull request #20980: [fix][broker] Fix message loss during topic compaction
Posted by "gaoran10 (via GitHub)" <gi...@apache.org>.
gaoran10 commented on code in PR #20980:
URL: https://github.com/apache/pulsar/pull/20980#discussion_r1292074537
##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java:
##########
@@ -271,7 +266,7 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
promise.completeExceptionally(exception2);
}
});
- if (to.equals(id)) {
+ if (lastReadId.equals(id)) {
Review Comment:
Yes, if we don't stop phase two, the next entry may be dropped because it does not exist in the `latestForKey`.
##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java:
##########
@@ -162,46 +161,42 @@ private void phaseOneLoop(RawReader reader,
mxBean.addCompactionRemovedEvent(reader.getTopic());
}
}
- MessageId first = firstMessageId.orElse(deletedMessage ? null : id);
- MessageId to = deletedMessage ? toMessageId.orElse(null) : id;
+ MessageId first = firstMessageId.orElse(id);
if (id.compareTo(lastMessageId) == 0) {
- loopPromise.complete(new PhaseOneResult(first == null ? id : first, to == null ? id : to,
- lastMessageId, latestForKey));
+ loopPromise.complete(new PhaseOneResult(first, lastMessageId, latestForKey));
} else {
phaseOneLoop(reader,
- Optional.ofNullable(first),
- Optional.ofNullable(to),
+ Optional.of(first),
lastMessageId,
latestForKey, loopPromise);
}
- } finally {
- m.close();
}
}, scheduler).exceptionally(ex -> {
loopPromise.completeExceptionally(ex);
return null;
});
}
- private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, MessageId to, MessageId lastReadId,
- Map<String, MessageId> latestForKey, BookKeeper bk) {
+ private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, MessageId lastReadId,
Review Comment:
Do we need to remove the param `to`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] Technoboy- commented on a diff in pull request #20980: [fix][broker] Fix message loss during topic compaction
Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- commented on code in PR #20980:
URL: https://github.com/apache/pulsar/pull/20980#discussion_r1293129942
##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java:
##########
@@ -122,27 +122,36 @@ private void phaseOneLoop(RawReader reader,
() -> FutureUtil.createTimeoutException("Timeout", getClass(), "phaseOneLoop(...)"));
future.thenAcceptAsync(m -> {
- try {
+ try (m) {
MessageId id = m.getMessageId();
boolean deletedMessage = false;
boolean replaceMessage = false;
mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
- if (RawBatchConverter.isReadableBatch(m)) {
+ MessageMetadata metadata = Commands.parseMessageMetadata(m.getHeadersAndPayload());
+ if (RawBatchConverter.isReadableBatch(metadata)) {
try {
+ int numMessagesInBatch = metadata.getNumMessagesInBatch();
+ int deleteCnt = 0;
for (ImmutableTriple<MessageId, String, Integer> e : extractIdsAndKeysAndSizeFromBatch(m)) {
+ boolean singleDeletedMessage = false;
+ boolean singleReplaceMessage = false;
Review Comment:
+1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] coderzc commented on a diff in pull request #20980: [fix][broker] Fix message loss during topic compaction
Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on code in PR #20980:
URL: https://github.com/apache/pulsar/pull/20980#discussion_r1291592568
##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java:
##########
@@ -271,7 +266,7 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
promise.completeExceptionally(exception2);
}
});
- if (to.equals(id)) {
+ if (lastReadId.equals(id)) {
Review Comment:
It seems to be a 'return' missing in line-287, we may need to open another PR to fix it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] coderzc commented on a diff in pull request #20980: [fix][broker] Fix message loss during topic compaction
Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on code in PR #20980:
URL: https://github.com/apache/pulsar/pull/20980#discussion_r1291592568
##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java:
##########
@@ -271,7 +266,7 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
promise.completeExceptionally(exception2);
}
});
- if (to.equals(id)) {
+ if (lastReadId.equals(id)) {
Review Comment:
It seems to be a return missing in line-275, we may need to open another PR to fix it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org