You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "Denovo1998 (via GitHub)" <gi...@apache.org> on 2023/11/28 16:16:36 UTC
[PR] [fix][broker] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]
Denovo1998 opened a new pull request, #21634:
URL: https://github.com/apache/pulsar/pull/21634
<!--
### Contribution Checklist
- PR title format should be *[type][component] summary*. For details, see *[Guideline - Pulsar PR Naming Convention](https://pulsar.apache.org/contribute/develop-semantic-title/)*.
- Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
- Each pull request should address only one issue, not mix up code from multiple issues.
- Each commit in the pull request has a meaningful commit message
- Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
-->
<!-- Either this PR fixes an issue, -->
Fixes #21537
<!-- Details of when a PIP is required and how the PIP process work, please see: https://github.com/apache/pulsar/blob/master/pip/README.md -->
### Motivation
The `compactedLedger` in the `CompactedTopicImpl` is created asynchronously, so it may be used without successful creation. (`bk.asyncOpenLedger`)
https://github.com/apache/pulsar/blob/e1d06b5f54f08c09debab7a9a513b7c173c1779b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java#L76-L87
https://github.com/apache/pulsar/blob/e1d06b5f54f08c09debab7a9a513b7c173c1779b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java#L239-L253
<!-- Explain here the context, and why you're making that change. What is the problem you're trying to solve. -->
### Modifications
Wait for `compactedTopicContext` to initialize correctly.
<!-- Describe the modifications you've done. -->
### Verifying this change
- [x] Make sure that the change passes the CI checks.
*(Please pick either of the following options)*
This change is a trivial rework / code cleanup without any test coverage.
*(or)*
This change is already covered by existing tests, such as *(please describe tests)*.
*(or)*
This change added tests and can be verified as follows:
*(example:)*
- *Added integration tests for end-to-end deployment with large payloads (10MB)*
- *Extended integration test for recovery after broker failure*
### Does this pull request potentially affect one of the following parts:
<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
*If the box was checked, please highlight the changes*
- [ ] Dependencies (add or upgrade a dependency)
- [ ] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [ ] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
- [ ] The metrics
- [ ] Anything that affects deployment
### Documentation
<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
- [ ] `doc` <!-- Your PR contains doc changes. -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->
### Matching PR in forked repository
PR in forked repository: <!-- ENTER URL HERE -->
<!--
After opening this PR, the build in apache/pulsar will fail and instructions will
be provided for opening a PR in the PR author's forked repository.
apache/pulsar pull requests should be first tested in your own fork since the
apache/pulsar CI based on GitHub Actions has constrained resources and quota.
GitHub Actions provides separate quota for pull requests that are executed in
a forked repository.
The tests will be run in the forked repository until all PR review comments have
been handled, the tests pass and the PR is approved by a reviewer.
-->
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]
Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1833137235
@Denovo1998 After looking at the code, I found that getCompactedTopicContext will wait for compactedTopicContext to be created, so we don’t seem to need to wait for compactedTopicContext to be created in test.
https://github.com/apache/pulsar/blob/4e7bff7f1d2a5fb5ef04c1e6428178675474ad39/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java#L298-L304
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]
Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1833303246
> ```java
> 2023-11-30T05:30:45,771 - WARN - [broker-client-shared-internal-executor-4522-1:PersistentTopic@3397] - [persistent://my-property/use/my-ns/testCompactionDuplicate] Compaction failure.
> java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$ConnectException: Consumer connect fail! consumer state:Connecting
> at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:368) ~[?:?]
> at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:377) ~[?:?]
> at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1152) ~[?:?]
> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
> at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[?:?]
> at org.apache.pulsar.compaction.TwoPhaseCompactor.lambda$closeLedger$20(TwoPhaseCompactor.java:388) ~[classes/:?]
> at org.apache.bookkeeper.client.PulsarMockLedgerHandle.lambda$asyncClose$1(PulsarMockLedgerHandle.java:96) ~[testmocks-3.2.0-SNAPSHOT.jar:4.16.3]
> at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
> at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[?:?]
> at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) ~[?:?]
> at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:137) ~[bookkeeper-common-4.16.3.jar:4.16.3]
> at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:107) ~[bookkeeper-common-4.16.3.jar:4.16.3]
> at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
> at java.lang.Thread.run(Thread.java:840) ~[?:?]
> Caused by: org.apache.pulsar.client.api.PulsarClientException$ConnectException: Consumer connect fail! consumer state:Connecting
> at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.doImmediateAck(PersistentAcknowledgmentsGroupingTracker.java:363) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
> at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.doCumulativeAck(PersistentAcknowledgmentsGroupingTracker.java:310) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
> at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.addAcknowledgment(PersistentAcknowledgmentsGroupingTracker.java:243) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
> at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.addAcknowledgment(PersistentAcknowledgmentsGroupingTracker.java:198) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
> at org.apache.pulsar.client.impl.ConsumerImpl.doAcknowledge(ConsumerImpl.java:566) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
> at org.apache.pulsar.client.impl.ConsumerBase.doAcknowledgeWithTxn(ConsumerBase.java:688) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
> at org.apache.pulsar.client.impl.RawReaderImpl.acknowledgeCumulativeAsync(RawReaderImpl.java:90) ~[classes/:3.2.0-SNAPSHOT]
> at org.apache.pulsar.compaction.TwoPhaseCompactor.lambda$phaseTwoSeekThenLoop$10(TwoPhaseCompactor.java:225) ~[classes/:?]
> at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150) ~[?:?]
> ```
>
> @Denovo1998 I found the above error logs from the CI report. I think this is the root cause. I think we can trigger compaction again after topic unload. Maybe we can get the future of compaction from persistentTopic and judge whether compaction task has failed, then retrigger it.
@coderzc I get it! So the cause is that `triggerCompactor` didn't succeed, not that `compactionLedger` creation timed out.
We need to unload the topic, make sure the topic properly initialized, and invoke `persistentTopic.compactionStatus()`, if success is to continue. If that fails, call `triggerCompactor` again.
If my description is correct, I will modify the solution description and code of PR after work.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] [fix][broker] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]
Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on code in PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#discussion_r1409190326
##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -1967,6 +1967,15 @@ public void testCompactionDuplicate() throws Exception {
// Wait for phase one to complete
Thread.sleep(500);
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();
+ PulsarTopicCompactionService topicCompactionService =
+ (PulsarTopicCompactionService) persistentTopic.getTopicCompactionService();
+ Optional<CompactedTopicContext> compactedTopicContext = topicCompactionService.getCompactedTopic()
+ .getCompactedTopicContext();
+ Assert.assertTrue(compactedTopicContext.isPresent());
+ });
+
Review Comment:
I think it should be moved to after topic unload.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]
Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on code in PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#discussion_r1411583148
##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -1967,9 +1967,35 @@ public void testCompactionDuplicate() throws Exception {
// Wait for phase one to complete
Thread.sleep(500);
+ Optional<Topic> previousTopicRef = pulsar.getBrokerService().getTopicIfExists(topic).get();
+ Assert.assertTrue(previousTopicRef.isPresent());
+ PersistentTopic previousPersistentTopic = (PersistentTopic) previousTopicRef.get();
+
// Unload topic make reader of compaction reconnect
admin.topics().unload(topic);
+ Awaitility.await().untilAsserted(() -> {
+ LongRunningProcessStatus previousLongRunningProcessStatus = previousPersistentTopic.compactionStatus();
+
+ Optional<Topic> currentTopicReference = pulsar.getBrokerService().getTopicReference(topic);
+ Assert.assertTrue(currentTopicReference.isPresent());
+ PersistentTopic currentPersistentTopic = (PersistentTopic) currentTopicReference.get();
+ LongRunningProcessStatus currentLongRunningProcessStatus = currentPersistentTopic.compactionStatus();
+
+ if (previousLongRunningProcessStatus.status == LongRunningProcessStatus.Status.ERROR
+ && (currentLongRunningProcessStatus.status == LongRunningProcessStatus.Status.NOT_RUN
+ || currentLongRunningProcessStatus.status == LongRunningProcessStatus.Status.ERROR)) {
+ // trigger compaction again
+ admin.topics().triggerCompaction(topic);
+ Assert.assertEquals(currentLongRunningProcessStatus.status, LongRunningProcessStatus.Status.SUCCESS);
+ } else if (previousLongRunningProcessStatus.status == LongRunningProcessStatus.Status.RUNNING
+ || previousLongRunningProcessStatus.status == LongRunningProcessStatus.Status.SUCCESS) {
+ Assert.assertEquals(previousLongRunningProcessStatus.status, LongRunningProcessStatus.Status.SUCCESS);
Review Comment:
This assertion always seems to be false.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] [fix][test] Fix the bug caused by unload topic and compaction task failure after task is triggered [pulsar]
Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1835723345
/pulsarbot rerun-failure-checks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]
Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on code in PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#discussion_r1410161409
##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -1970,6 +1970,17 @@ public void testCompactionDuplicate() throws Exception {
// Unload topic make reader of compaction reconnect
admin.topics().unload(topic);
+ Optional<Topic> topicRef = pulsar.getBrokerService().getTopicIfExists(topic).get();
+ Assert.assertTrue(topicRef.isPresent());
+ PersistentTopic persistentTopic = (PersistentTopic) topicRef.get();
Review Comment:
Same processing here:
https://github.com/apache/pulsar/blob/4e7bff7f1d2a5fb5ef04c1e6428178675474ad39/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java#L407-L422
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]
Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1833931968
@coderzc Yes, I just read the code and got it. Let's look at the second and third questions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]
Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1835335061
> > we also need to fix it in StrategicCompactionTest.testCompactionDuplicate. just retrigger compaction if the previous task failed after topic was unloaded.
>
> @coderzc Do you mean to create a new test case in `org.apache.pulsar.compaction.StrategicCompactionTest`?
>
> **All has been changed. Please check again.**
Oh, Don't need to do that., StrategicCompactionTest already extends from CompactionTest.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]
Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on code in PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#discussion_r1409326536
##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -1967,6 +1967,15 @@ public void testCompactionDuplicate() throws Exception {
// Wait for phase one to complete
Thread.sleep(500);
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();
+ PulsarTopicCompactionService topicCompactionService =
+ (PulsarTopicCompactionService) persistentTopic.getTopicCompactionService();
+ Optional<CompactedTopicContext> compactedTopicContext = topicCompactionService.getCompactedTopic()
+ .getCompactedTopicContext();
+ Assert.assertTrue(compactedTopicContext.isPresent());
+ });
+
Review Comment:
has been fixed!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] [fix][test] Fix the bug caused by unload topic and compaction task failure after task is triggered [pulsar]
Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on code in PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#discussion_r1411666444
##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -1967,9 +1967,35 @@ public void testCompactionDuplicate() throws Exception {
// Wait for phase one to complete
Thread.sleep(500);
+ Optional<Topic> previousTopicRef = pulsar.getBrokerService().getTopicIfExists(topic).get();
+ Assert.assertTrue(previousTopicRef.isPresent());
+ PersistentTopic previousPersistentTopic = (PersistentTopic) previousTopicRef.get();
+
// Unload topic make reader of compaction reconnect
admin.topics().unload(topic);
+ Awaitility.await().untilAsserted(() -> {
+ LongRunningProcessStatus previousLongRunningProcessStatus = previousPersistentTopic.compactionStatus();
+
+ Optional<Topic> currentTopicReference = pulsar.getBrokerService().getTopicReference(topic);
+ Assert.assertTrue(currentTopicReference.isPresent());
+ PersistentTopic currentPersistentTopic = (PersistentTopic) currentTopicReference.get();
+ LongRunningProcessStatus currentLongRunningProcessStatus = currentPersistentTopic.compactionStatus();
+
+ if (previousLongRunningProcessStatus.status == LongRunningProcessStatus.Status.ERROR
+ && (currentLongRunningProcessStatus.status == LongRunningProcessStatus.Status.NOT_RUN
+ || currentLongRunningProcessStatus.status == LongRunningProcessStatus.Status.ERROR)) {
+ // trigger compaction again
+ admin.topics().triggerCompaction(topic);
+ Assert.assertEquals(currentLongRunningProcessStatus.status, LongRunningProcessStatus.Status.SUCCESS);
Review Comment:
done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] [fix][test] Fix the bug caused by unload topic and compaction task failure after task is triggered [pulsar]
Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc merged PR #21634:
URL: https://github.com/apache/pulsar/pull/21634
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]
Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1832336269
@coderzc @Technoboy- CI checks reported an error just now and has been fixed now. Please check again.
<img width="1483" alt="CleanShot 2023-11-29 at 10 51 02@2x" src="https://github.com/apache/pulsar/assets/54846009/4ccb0f87-bae3-4d5e-8055-edc0a49c7025">
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]
Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1833320134
> @coderzc I get it! So the cause is that triggerCompactor didn't succeed, not that compactionLedger creation timed out.
We need to unload the topic, make sure the topic properly initialized, and invoke persistentTopic.compactionStatus(), if success is to continue. If that fails, call triggerCompactor again.
If my description is correct, I will modify the solution description and code of PR after work.
Yes, I think so.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]
Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1833950017
@coderzc Therefore, the current code in this pr waits for the CompactedTopicContext to be created successfully, which has solved this issue.
As for the error report in number 3, it seems to be the problem of [issue21536](https://github.com/apache/pulsar/issues/21536)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] [fix][test] Fix the bug caused by unload topic and compaction task failure after task is triggered [pulsar]
Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on code in PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#discussion_r1411656409
##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -1967,9 +1967,35 @@ public void testCompactionDuplicate() throws Exception {
// Wait for phase one to complete
Thread.sleep(500);
+ Optional<Topic> previousTopicRef = pulsar.getBrokerService().getTopicIfExists(topic).get();
+ Assert.assertTrue(previousTopicRef.isPresent());
+ PersistentTopic previousPersistentTopic = (PersistentTopic) previousTopicRef.get();
+
// Unload topic make reader of compaction reconnect
admin.topics().unload(topic);
+ Awaitility.await().untilAsserted(() -> {
+ LongRunningProcessStatus previousLongRunningProcessStatus = previousPersistentTopic.compactionStatus();
+
+ Optional<Topic> currentTopicReference = pulsar.getBrokerService().getTopicReference(topic);
+ Assert.assertTrue(currentTopicReference.isPresent());
+ PersistentTopic currentPersistentTopic = (PersistentTopic) currentTopicReference.get();
+ LongRunningProcessStatus currentLongRunningProcessStatus = currentPersistentTopic.compactionStatus();
+
+ if (previousLongRunningProcessStatus.status == LongRunningProcessStatus.Status.ERROR
+ && (currentLongRunningProcessStatus.status == LongRunningProcessStatus.Status.NOT_RUN
+ || currentLongRunningProcessStatus.status == LongRunningProcessStatus.Status.ERROR)) {
+ // trigger compaction again
+ admin.topics().triggerCompaction(topic);
+ Assert.assertEquals(currentLongRunningProcessStatus.status, LongRunningProcessStatus.Status.SUCCESS);
Review Comment:
If the previous task is still running, we need to keep checking
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]
Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on code in PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#discussion_r1410155628
##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -1970,6 +1970,17 @@ public void testCompactionDuplicate() throws Exception {
// Unload topic make reader of compaction reconnect
admin.topics().unload(topic);
+ Optional<Topic> topicRef = pulsar.getBrokerService().getTopicIfExists(topic).get();
+ Assert.assertTrue(topicRef.isPresent());
+ PersistentTopic persistentTopic = (PersistentTopic) topicRef.get();
Review Comment:
In fact, we must wait for the topic to exist before we can continue. If the topic does not exist after being placed in the `Awaitility` for more than 10 seconds, an error will still be reported.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]
Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1833973085
> As for the error report in number 3, it seems to be the problem of https://github.com/apache/pulsar/issues/21536?
why? we also need to fix it in `StrategicCompactionTest.testCompactionDuplicate`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]
Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1835406944
> Oh, Don't need to do that., StrategicCompactionTest already extends from CompactionTest.
@coderzc I'm sorry I misunderstood what you said. I am still learning the code structure of pulsar, and I am not particularly familiar with it. The modification to the `org.apache.pulsar.com paction.StrategicCompactionTest` has been removed. Please check again.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]
Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1833830378
@coderzc Something strange.
**1. Call persistentTopic.compactionStatus() after unload topic, longRunningProcessStatus always NOT_RUN**
<img width="600" alt="CleanShot 2023-11-30 at 9 10 48@2x" src="https://github.com/apache/pulsar/assets/54846009/8f50b4bd-ac10-430b-b14f-14559b567a94">
<img width="600" alt="CleanShot 2023-11-30 at 9 12 09@2x" src="https://github.com/apache/pulsar/assets/54846009/9bb2db60-72e5-4e73-afa1-551f1048f103">
**2. Even if the topic already exists, the `compactedTopicContext` is not created correctly.**
<img width="600" alt="CleanShot 2023-11-30 at 9 07 18@2x" src="https://github.com/apache/pulsar/assets/54846009/a41b80ac-a8b6-4231-995a-4ee67bc6b100">
<img width="600" alt="CleanShot 2023-11-30 at 9 07 35@2x" src="https://github.com/apache/pulsar/assets/54846009/80f93cd4-fbcc-4759-b2cf-05e3310b606f">
The problem still seems to be the `compactedTopicContext`, The `compactedLedger` hasn't been created yet
(`org.apache.pulsar.compaction.CompactedTopicImpl#openCompactedLedger`).
```java
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic, false);
```
https://github.com/apache/pulsar/blob/3377003dbe0f53b84298c5c26acbacd2b07c30e2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L2455-L2469
**3. The error you sent before should be this issue. Does the consumer with the subscription name `__compaction` not reconnect?**
https://github.com/apache/pulsar/issues/21536
![CleanShot 2023-11-30 at 9 41 32@2x](https://github.com/apache/pulsar/assets/54846009/9d86b83b-2216-4654-8627-b3f285cd9547)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] [fix][test] Fix the bug caused by unload topic and compaction task failure after task is triggered [pulsar]
Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1835809319
## [Codecov](https://app.codecov.io/gh/apache/pulsar/pull/21634?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
> Merging [#21634](https://app.codecov.io/gh/apache/pulsar/pull/21634?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (5ae340f) into [master](https://app.codecov.io/gh/apache/pulsar/commit/e1d06b5f54f08c09debab7a9a513b7c173c1779b?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (e1d06b5) will **increase** coverage by `36.49%`.
> Report is 12 commits behind head on master.
> The diff coverage is `n/a`.
<details><summary>Additional details and impacted files</summary>
[![Impacted file tree graph](https://app.codecov.io/gh/apache/pulsar/pull/21634/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/21634?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
```diff
@@ Coverage Diff @@
## master #21634 +/- ##
=============================================
+ Coverage 36.83% 73.33% +36.49%
- Complexity 376 32740 +32364
=============================================
Files 1715 1893 +178
Lines 131097 140730 +9633
Branches 14318 15500 +1182
=============================================
+ Hits 48288 103201 +54913
+ Misses 76424 29421 -47003
- Partials 6385 8108 +1723
```
| [Flag](https://app.codecov.io/gh/apache/pulsar/pull/21634/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
|---|---|---|
| [inttests](https://app.codecov.io/gh/apache/pulsar/pull/21634/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `24.15% <ø> (-0.09%)` | :arrow_down: |
| [systests](https://app.codecov.io/gh/apache/pulsar/pull/21634/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `24.66% <ø> (-0.26%)` | :arrow_down: |
| [unittests](https://app.codecov.io/gh/apache/pulsar/pull/21634/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `72.66% <ø> (+40.79%)` | :arrow_up: |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
[see 1460 files with indirect coverage changes](https://app.codecov.io/gh/apache/pulsar/pull/21634/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]
Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1833918250
@Denovo1998
> 1. Call persistentTopic.compactionStatus() after unload topic, longRunningProcessStatus always NOT_RUN
Because after unloading, `persistentTopic.compactionStatus()` will be reset, so we cannot get the running status of the previous task through it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]
Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1833937914
> Even if the topic already exists, the compactedTopicContext is not created correctly.
Maybe the compaction task isn't finished yet? `CompactedTopicContext` is created only after the compaction task is completed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]
Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1833158125
> @Denovo1998 After looking at the code, I found that `getCompactedTopicContext` method will wait for compactedTopicContext to be created, so we don’t seem to need to wait for `compactedLedger ` to be created in the test. I think we just need to wait for topic initialization to complete.
>
>
After the topic initialization is complete, we also wait for the CompactedTopicContext to be created. Because the creation of CompactedTopicContext requires the creation of compactedLedger, otherwise ledgerId will be -1, and an error will be reported if a ledger is not created after 10s. here
https://github.com/apache/pulsar/blob/4e7bff7f1d2a5fb5ef04c1e6428178675474ad39/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java#L298-L304
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]
Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1833205492
```java
2023-11-30T05:30:45,771 - WARN - [broker-client-shared-internal-executor-4522-1:PersistentTopic@3397] - [persistent://my-property/use/my-ns/testCompactionDuplicate] Compaction failure.
java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$ConnectException: Consumer connect fail! consumer state:Connecting
at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:368) ~[?:?]
at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:377) ~[?:?]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1152) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[?:?]
at org.apache.pulsar.compaction.TwoPhaseCompactor.lambda$closeLedger$20(TwoPhaseCompactor.java:388) ~[classes/:?]
at org.apache.bookkeeper.client.PulsarMockLedgerHandle.lambda$asyncClose$1(PulsarMockLedgerHandle.java:96) ~[testmocks-3.2.0-SNAPSHOT.jar:4.16.3]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[?:?]
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) ~[?:?]
at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:137) ~[bookkeeper-common-4.16.3.jar:4.16.3]
at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:107) ~[bookkeeper-common-4.16.3.jar:4.16.3]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
at java.lang.Thread.run(Thread.java:840) ~[?:?]
Caused by: org.apache.pulsar.client.api.PulsarClientException$ConnectException: Consumer connect fail! consumer state:Connecting
at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.doImmediateAck(PersistentAcknowledgmentsGroupingTracker.java:363) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.doCumulativeAck(PersistentAcknowledgmentsGroupingTracker.java:310) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.addAcknowledgment(PersistentAcknowledgmentsGroupingTracker.java:243) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.addAcknowledgment(PersistentAcknowledgmentsGroupingTracker.java:198) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
at org.apache.pulsar.client.impl.ConsumerImpl.doAcknowledge(ConsumerImpl.java:566) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
at org.apache.pulsar.client.impl.ConsumerBase.doAcknowledgeWithTxn(ConsumerBase.java:688) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
at org.apache.pulsar.client.impl.RawReaderImpl.acknowledgeCumulativeAsync(RawReaderImpl.java:90) ~[classes/:3.2.0-SNAPSHOT]
at org.apache.pulsar.compaction.TwoPhaseCompactor.lambda$phaseTwoSeekThenLoop$10(TwoPhaseCompactor.java:225) ~[classes/:?]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150) ~[?:?]
```
@Denovo1998 I found the above error logs from the CI report. I think this is the root cause. I think we can trigger compaction again after topic unload.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]
Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- commented on code in PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#discussion_r1410150884
##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -1970,6 +1970,17 @@ public void testCompactionDuplicate() throws Exception {
// Unload topic make reader of compaction reconnect
admin.topics().unload(topic);
+ Optional<Topic> topicRef = pulsar.getBrokerService().getTopicIfExists(topic).get();
+ Assert.assertTrue(topicRef.isPresent());
+ PersistentTopic persistentTopic = (PersistentTopic) topicRef.get();
Review Comment:
Better to move this in the `Awaitility`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]
Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1834330902
> we also need to fix it in StrategicCompactionTest.testCompactionDuplicate. just retrigger compaction if the previous task failed after topic was unloaded.
@coderzc Do you mean to create a new test case in `org.apache.pulsar.compaction.StrategicCompactionTest`?
**All has been changed. Please check again.**
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]
Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on code in PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#discussion_r1411583148
##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -1967,9 +1967,35 @@ public void testCompactionDuplicate() throws Exception {
// Wait for phase one to complete
Thread.sleep(500);
+ Optional<Topic> previousTopicRef = pulsar.getBrokerService().getTopicIfExists(topic).get();
+ Assert.assertTrue(previousTopicRef.isPresent());
+ PersistentTopic previousPersistentTopic = (PersistentTopic) previousTopicRef.get();
+
// Unload topic make reader of compaction reconnect
admin.topics().unload(topic);
+ Awaitility.await().untilAsserted(() -> {
+ LongRunningProcessStatus previousLongRunningProcessStatus = previousPersistentTopic.compactionStatus();
+
+ Optional<Topic> currentTopicReference = pulsar.getBrokerService().getTopicReference(topic);
+ Assert.assertTrue(currentTopicReference.isPresent());
+ PersistentTopic currentPersistentTopic = (PersistentTopic) currentTopicReference.get();
+ LongRunningProcessStatus currentLongRunningProcessStatus = currentPersistentTopic.compactionStatus();
+
+ if (previousLongRunningProcessStatus.status == LongRunningProcessStatus.Status.ERROR
+ && (currentLongRunningProcessStatus.status == LongRunningProcessStatus.Status.NOT_RUN
+ || currentLongRunningProcessStatus.status == LongRunningProcessStatus.Status.ERROR)) {
+ // trigger compaction again
+ admin.topics().triggerCompaction(topic);
+ Assert.assertEquals(currentLongRunningProcessStatus.status, LongRunningProcessStatus.Status.SUCCESS);
+ } else if (previousLongRunningProcessStatus.status == LongRunningProcessStatus.Status.RUNNING
+ || previousLongRunningProcessStatus.status == LongRunningProcessStatus.Status.SUCCESS) {
+ Assert.assertEquals(previousLongRunningProcessStatus.status, LongRunningProcessStatus.Status.SUCCESS);
Review Comment:
This assertion always seems to be false.
##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -1967,9 +1967,35 @@ public void testCompactionDuplicate() throws Exception {
// Wait for phase one to complete
Thread.sleep(500);
+ Optional<Topic> previousTopicRef = pulsar.getBrokerService().getTopicIfExists(topic).get();
+ Assert.assertTrue(previousTopicRef.isPresent());
+ PersistentTopic previousPersistentTopic = (PersistentTopic) previousTopicRef.get();
+
// Unload topic make reader of compaction reconnect
admin.topics().unload(topic);
+ Awaitility.await().untilAsserted(() -> {
+ LongRunningProcessStatus previousLongRunningProcessStatus = previousPersistentTopic.compactionStatus();
+
+ Optional<Topic> currentTopicReference = pulsar.getBrokerService().getTopicReference(topic);
+ Assert.assertTrue(currentTopicReference.isPresent());
+ PersistentTopic currentPersistentTopic = (PersistentTopic) currentTopicReference.get();
+ LongRunningProcessStatus currentLongRunningProcessStatus = currentPersistentTopic.compactionStatus();
+
+ if (previousLongRunningProcessStatus.status == LongRunningProcessStatus.Status.ERROR
+ && (currentLongRunningProcessStatus.status == LongRunningProcessStatus.Status.NOT_RUN
+ || currentLongRunningProcessStatus.status == LongRunningProcessStatus.Status.ERROR)) {
+ // trigger compaction again
+ admin.topics().triggerCompaction(topic);
+ Assert.assertEquals(currentLongRunningProcessStatus.status, LongRunningProcessStatus.Status.SUCCESS);
Review Comment:
I think we can just return here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]
Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on code in PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#discussion_r1411596166
##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -1967,9 +1967,35 @@ public void testCompactionDuplicate() throws Exception {
// Wait for phase one to complete
Thread.sleep(500);
+ Optional<Topic> previousTopicRef = pulsar.getBrokerService().getTopicIfExists(topic).get();
+ Assert.assertTrue(previousTopicRef.isPresent());
+ PersistentTopic previousPersistentTopic = (PersistentTopic) previousTopicRef.get();
+
// Unload topic make reader of compaction reconnect
admin.topics().unload(topic);
+ Awaitility.await().untilAsserted(() -> {
+ LongRunningProcessStatus previousLongRunningProcessStatus = previousPersistentTopic.compactionStatus();
+
+ Optional<Topic> currentTopicReference = pulsar.getBrokerService().getTopicReference(topic);
+ Assert.assertTrue(currentTopicReference.isPresent());
+ PersistentTopic currentPersistentTopic = (PersistentTopic) currentTopicReference.get();
+ LongRunningProcessStatus currentLongRunningProcessStatus = currentPersistentTopic.compactionStatus();
+
+ if (previousLongRunningProcessStatus.status == LongRunningProcessStatus.Status.ERROR
+ && (currentLongRunningProcessStatus.status == LongRunningProcessStatus.Status.NOT_RUN
+ || currentLongRunningProcessStatus.status == LongRunningProcessStatus.Status.ERROR)) {
+ // trigger compaction again
+ admin.topics().triggerCompaction(topic);
+ Assert.assertEquals(currentLongRunningProcessStatus.status, LongRunningProcessStatus.Status.SUCCESS);
Review Comment:
You are right, there are two kinds of situations:
```java
1. previous task error, and then trigger compaction again
(previousLongRunningProcessStatus:ERROR, currentLongRunningProcessStatus:NOT_RUN -> RUNNING -> SUCCESS)
3. previous task success
(previousLongRunningProcessStatus:RUNNING -> SUCCESS, currentLongRunningProcessStatus:NOT_RUN)
```
done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org