You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "Denovo1998 (via GitHub)" <gi...@apache.org> on 2023/11/28 16:16:36 UTC

[PR] [fix][broker] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]

Denovo1998 opened a new pull request, #21634:
URL: https://github.com/apache/pulsar/pull/21634

   <!--
   ### Contribution Checklist
     
     - PR title format should be *[type][component] summary*. For details, see *[Guideline - Pulsar PR Naming Convention](https://pulsar.apache.org/contribute/develop-semantic-title/)*. 
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   -->
   
   <!-- Either this PR fixes an issue, -->
   
   Fixes #21537
   
   <!-- Details of when a PIP is required and how the PIP process work, please see: https://github.com/apache/pulsar/blob/master/pip/README.md -->
   
   ### Motivation
   The `compactedLedger` in the `CompactedTopicImpl` is created asynchronously, so it may be used without successful creation. (`bk.asyncOpenLedger`)
   
   https://github.com/apache/pulsar/blob/e1d06b5f54f08c09debab7a9a513b7c173c1779b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java#L76-L87
   
   https://github.com/apache/pulsar/blob/e1d06b5f54f08c09debab7a9a513b7c173c1779b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java#L239-L253
   
   <!-- Explain here the context, and why you're making that change. What is the problem you're trying to solve. -->
   
   ### Modifications
   Wait for `compactedTopicContext` to initialize correctly.
   
   <!-- Describe the modifications you've done. -->
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   
   ### Matching PR in forked repository
   
   PR in forked repository: <!-- ENTER URL HERE -->
   
   <!--
   After opening this PR, the build in apache/pulsar will fail and instructions will
   be provided for opening a PR in the PR author's forked repository.
   
   apache/pulsar pull requests should be first tested in your own fork since the 
   apache/pulsar CI based on GitHub Actions has constrained resources and quota.
   GitHub Actions provides separate quota for pull requests that are executed in 
   a forked repository.
   
   The tests will be run in the forked repository until all PR review comments have
   been handled, the tests pass and the PR is approved by a reviewer.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]

Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1833137235

   @Denovo1998 After looking at the code, I found that getCompactedTopicContext will wait for compactedTopicContext to be created, so we don’t seem to need to wait for compactedTopicContext to be created in test.
   
   https://github.com/apache/pulsar/blob/4e7bff7f1d2a5fb5ef04c1e6428178675474ad39/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java#L298-L304


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1833303246

   > ```java
   > 2023-11-30T05:30:45,771 - WARN  - [broker-client-shared-internal-executor-4522-1:PersistentTopic@3397] - [persistent://my-property/use/my-ns/testCompactionDuplicate] Compaction failure.
   > java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$ConnectException: Consumer connect fail! consumer state:Connecting
   > 	at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:368) ~[?:?]
   > 	at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:377) ~[?:?]
   > 	at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1152) ~[?:?]
   > 	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
   > 	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[?:?]
   > 	at org.apache.pulsar.compaction.TwoPhaseCompactor.lambda$closeLedger$20(TwoPhaseCompactor.java:388) ~[classes/:?]
   > 	at org.apache.bookkeeper.client.PulsarMockLedgerHandle.lambda$asyncClose$1(PulsarMockLedgerHandle.java:96) ~[testmocks-3.2.0-SNAPSHOT.jar:4.16.3]
   > 	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
   > 	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[?:?]
   > 	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) ~[?:?]
   > 	at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:137) ~[bookkeeper-common-4.16.3.jar:4.16.3]
   > 	at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:107) ~[bookkeeper-common-4.16.3.jar:4.16.3]
   > 	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
   > 	at java.lang.Thread.run(Thread.java:840) ~[?:?]
   > Caused by: org.apache.pulsar.client.api.PulsarClientException$ConnectException: Consumer connect fail! consumer state:Connecting
   > 	at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.doImmediateAck(PersistentAcknowledgmentsGroupingTracker.java:363) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
   > 	at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.doCumulativeAck(PersistentAcknowledgmentsGroupingTracker.java:310) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
   > 	at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.addAcknowledgment(PersistentAcknowledgmentsGroupingTracker.java:243) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
   > 	at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.addAcknowledgment(PersistentAcknowledgmentsGroupingTracker.java:198) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
   > 	at org.apache.pulsar.client.impl.ConsumerImpl.doAcknowledge(ConsumerImpl.java:566) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
   > 	at org.apache.pulsar.client.impl.ConsumerBase.doAcknowledgeWithTxn(ConsumerBase.java:688) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
   > 	at org.apache.pulsar.client.impl.RawReaderImpl.acknowledgeCumulativeAsync(RawReaderImpl.java:90) ~[classes/:3.2.0-SNAPSHOT]
   > 	at org.apache.pulsar.compaction.TwoPhaseCompactor.lambda$phaseTwoSeekThenLoop$10(TwoPhaseCompactor.java:225) ~[classes/:?]
   > 	at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150) ~[?:?]
   > ```
   > 
   > @Denovo1998 I found the above error logs from the CI report. I think this is the root cause. I think we can trigger compaction again after topic unload. Maybe we can get the future of compaction from persistentTopic and judge whether compaction task has failed, then retrigger it.
   
   @coderzc I get it! So the cause is that `triggerCompactor` didn't succeed, not that `compactionLedger` creation timed out. 
   We need to unload the topic, make sure the topic properly initialized, and invoke `persistentTopic.compactionStatus()`, if success is to continue. If that fails, call `triggerCompactor` again. 
   If my description is correct, I will modify the solution description and code of PR after work.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]

Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on code in PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#discussion_r1409190326


##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -1967,6 +1967,15 @@ public void testCompactionDuplicate() throws Exception {
         // Wait for phase one to complete
         Thread.sleep(500);
 
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();
+            PulsarTopicCompactionService topicCompactionService =
+                    (PulsarTopicCompactionService) persistentTopic.getTopicCompactionService();
+            Optional<CompactedTopicContext> compactedTopicContext = topicCompactionService.getCompactedTopic()
+                    .getCompactedTopicContext();
+            Assert.assertTrue(compactedTopicContext.isPresent());
+        });
+

Review Comment:
   I think it should be moved to after topic unload.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]

Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on code in PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#discussion_r1411583148


##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -1967,9 +1967,35 @@ public void testCompactionDuplicate() throws Exception {
         // Wait for phase one to complete
         Thread.sleep(500);
 
+        Optional<Topic> previousTopicRef = pulsar.getBrokerService().getTopicIfExists(topic).get();
+        Assert.assertTrue(previousTopicRef.isPresent());
+        PersistentTopic previousPersistentTopic = (PersistentTopic) previousTopicRef.get();
+
         // Unload topic make reader of compaction reconnect
         admin.topics().unload(topic);
 
+        Awaitility.await().untilAsserted(() -> {
+            LongRunningProcessStatus previousLongRunningProcessStatus = previousPersistentTopic.compactionStatus();
+
+            Optional<Topic> currentTopicReference = pulsar.getBrokerService().getTopicReference(topic);
+            Assert.assertTrue(currentTopicReference.isPresent());
+            PersistentTopic currentPersistentTopic = (PersistentTopic) currentTopicReference.get();
+            LongRunningProcessStatus currentLongRunningProcessStatus = currentPersistentTopic.compactionStatus();
+
+            if (previousLongRunningProcessStatus.status == LongRunningProcessStatus.Status.ERROR
+                    && (currentLongRunningProcessStatus.status == LongRunningProcessStatus.Status.NOT_RUN
+                    || currentLongRunningProcessStatus.status == LongRunningProcessStatus.Status.ERROR)) {
+                // trigger compaction again
+                admin.topics().triggerCompaction(topic);
+                Assert.assertEquals(currentLongRunningProcessStatus.status, LongRunningProcessStatus.Status.SUCCESS);
+            } else if (previousLongRunningProcessStatus.status == LongRunningProcessStatus.Status.RUNNING
+                    || previousLongRunningProcessStatus.status == LongRunningProcessStatus.Status.SUCCESS) {
+                Assert.assertEquals(previousLongRunningProcessStatus.status, LongRunningProcessStatus.Status.SUCCESS);

Review Comment:
   This assertion always seems to be false.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][test] Fix the bug caused by unload topic and compaction task failure after task is triggered [pulsar]

Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1835723345

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on code in PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#discussion_r1410161409


##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -1970,6 +1970,17 @@ public void testCompactionDuplicate() throws Exception {
         // Unload topic make reader of compaction reconnect
         admin.topics().unload(topic);
 
+        Optional<Topic> topicRef = pulsar.getBrokerService().getTopicIfExists(topic).get();
+        Assert.assertTrue(topicRef.isPresent());
+        PersistentTopic persistentTopic = (PersistentTopic) topicRef.get();

Review Comment:
   Same processing here:
   https://github.com/apache/pulsar/blob/4e7bff7f1d2a5fb5ef04c1e6428178675474ad39/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java#L407-L422



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1833931968

   @coderzc Yes, I just read the code and got it. Let's look at the second and third questions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]

Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1835335061

   > > we also need to fix it in StrategicCompactionTest.testCompactionDuplicate. just retrigger compaction if the previous task failed after topic was unloaded.
   > 
   > @coderzc Do you mean to create a new test case in `org.apache.pulsar.compaction.StrategicCompactionTest`?
   > 
   > **All has been changed. Please check again.**
   
   Oh, Don't need to do that., StrategicCompactionTest already extends from CompactionTest.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on code in PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#discussion_r1409326536


##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -1967,6 +1967,15 @@ public void testCompactionDuplicate() throws Exception {
         // Wait for phase one to complete
         Thread.sleep(500);
 
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();
+            PulsarTopicCompactionService topicCompactionService =
+                    (PulsarTopicCompactionService) persistentTopic.getTopicCompactionService();
+            Optional<CompactedTopicContext> compactedTopicContext = topicCompactionService.getCompactedTopic()
+                    .getCompactedTopicContext();
+            Assert.assertTrue(compactedTopicContext.isPresent());
+        });
+

Review Comment:
   has been fixed!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][test] Fix the bug caused by unload topic and compaction task failure after task is triggered [pulsar]

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on code in PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#discussion_r1411666444


##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -1967,9 +1967,35 @@ public void testCompactionDuplicate() throws Exception {
         // Wait for phase one to complete
         Thread.sleep(500);
 
+        Optional<Topic> previousTopicRef = pulsar.getBrokerService().getTopicIfExists(topic).get();
+        Assert.assertTrue(previousTopicRef.isPresent());
+        PersistentTopic previousPersistentTopic = (PersistentTopic) previousTopicRef.get();
+
         // Unload topic make reader of compaction reconnect
         admin.topics().unload(topic);
 
+        Awaitility.await().untilAsserted(() -> {
+            LongRunningProcessStatus previousLongRunningProcessStatus = previousPersistentTopic.compactionStatus();
+
+            Optional<Topic> currentTopicReference = pulsar.getBrokerService().getTopicReference(topic);
+            Assert.assertTrue(currentTopicReference.isPresent());
+            PersistentTopic currentPersistentTopic = (PersistentTopic) currentTopicReference.get();
+            LongRunningProcessStatus currentLongRunningProcessStatus = currentPersistentTopic.compactionStatus();
+
+            if (previousLongRunningProcessStatus.status == LongRunningProcessStatus.Status.ERROR
+                    && (currentLongRunningProcessStatus.status == LongRunningProcessStatus.Status.NOT_RUN
+                    || currentLongRunningProcessStatus.status == LongRunningProcessStatus.Status.ERROR)) {
+                // trigger compaction again
+                admin.topics().triggerCompaction(topic);
+                Assert.assertEquals(currentLongRunningProcessStatus.status, LongRunningProcessStatus.Status.SUCCESS);

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][test] Fix the bug caused by unload topic and compaction task failure after task is triggered [pulsar]

Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc merged PR #21634:
URL: https://github.com/apache/pulsar/pull/21634


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1832336269

   @coderzc @Technoboy- CI checks reported an error just now and has been fixed now. Please check again.
   
   <img width="1483" alt="CleanShot 2023-11-29 at 10 51 02@2x" src="https://github.com/apache/pulsar/assets/54846009/4ccb0f87-bae3-4d5e-8055-edc0a49c7025">
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]

Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1833320134

   > @coderzc I get it! So the cause is that triggerCompactor didn't succeed, not that compactionLedger creation timed out.
   We need to unload the topic, make sure the topic properly initialized, and invoke persistentTopic.compactionStatus(), if success is to continue. If that fails, call triggerCompactor again.
   If my description is correct, I will modify the solution description and code of PR after work.
   
   Yes, I think so. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1833950017

   @coderzc Therefore, the current code in this pr waits for the CompactedTopicContext to be created successfully, which has solved this issue. 
   As for the error report in number 3, it seems to be the problem of [issue21536](https://github.com/apache/pulsar/issues/21536)?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][test] Fix the bug caused by unload topic and compaction task failure after task is triggered [pulsar]

Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on code in PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#discussion_r1411656409


##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -1967,9 +1967,35 @@ public void testCompactionDuplicate() throws Exception {
         // Wait for phase one to complete
         Thread.sleep(500);
 
+        Optional<Topic> previousTopicRef = pulsar.getBrokerService().getTopicIfExists(topic).get();
+        Assert.assertTrue(previousTopicRef.isPresent());
+        PersistentTopic previousPersistentTopic = (PersistentTopic) previousTopicRef.get();
+
         // Unload topic make reader of compaction reconnect
         admin.topics().unload(topic);
 
+        Awaitility.await().untilAsserted(() -> {
+            LongRunningProcessStatus previousLongRunningProcessStatus = previousPersistentTopic.compactionStatus();
+
+            Optional<Topic> currentTopicReference = pulsar.getBrokerService().getTopicReference(topic);
+            Assert.assertTrue(currentTopicReference.isPresent());
+            PersistentTopic currentPersistentTopic = (PersistentTopic) currentTopicReference.get();
+            LongRunningProcessStatus currentLongRunningProcessStatus = currentPersistentTopic.compactionStatus();
+
+            if (previousLongRunningProcessStatus.status == LongRunningProcessStatus.Status.ERROR
+                    && (currentLongRunningProcessStatus.status == LongRunningProcessStatus.Status.NOT_RUN
+                    || currentLongRunningProcessStatus.status == LongRunningProcessStatus.Status.ERROR)) {
+                // trigger compaction again
+                admin.topics().triggerCompaction(topic);
+                Assert.assertEquals(currentLongRunningProcessStatus.status, LongRunningProcessStatus.Status.SUCCESS);

Review Comment:
   If the previous task is still running, we need to keep checking



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on code in PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#discussion_r1410155628


##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -1970,6 +1970,17 @@ public void testCompactionDuplicate() throws Exception {
         // Unload topic make reader of compaction reconnect
         admin.topics().unload(topic);
 
+        Optional<Topic> topicRef = pulsar.getBrokerService().getTopicIfExists(topic).get();
+        Assert.assertTrue(topicRef.isPresent());
+        PersistentTopic persistentTopic = (PersistentTopic) topicRef.get();

Review Comment:
   In fact, we must wait for the topic to exist before we can continue. If the topic does not exist after being placed in the `Awaitility` for more than 10 seconds, an error will still be reported.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]

Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1833973085

   > As for the error report in number 3, it seems to be the problem of https://github.com/apache/pulsar/issues/21536?
   
   why? we also need to fix it in `StrategicCompactionTest.testCompactionDuplicate`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1835406944

   > Oh, Don't need to do that., StrategicCompactionTest already extends from CompactionTest.
   
   @coderzc I'm sorry I misunderstood what you said. I am still learning the code structure of pulsar, and I am not particularly familiar with it. The modification to the `org.apache.pulsar.com paction.StrategicCompactionTest` has been removed. Please check again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1833830378

   @coderzc Something strange.
   
   **1. Call persistentTopic.compactionStatus() after unload topic, longRunningProcessStatus always NOT_RUN**
   <img width="600" alt="CleanShot 2023-11-30 at 9 10 48@2x" src="https://github.com/apache/pulsar/assets/54846009/8f50b4bd-ac10-430b-b14f-14559b567a94">
   <img width="600" alt="CleanShot 2023-11-30 at 9 12 09@2x" src="https://github.com/apache/pulsar/assets/54846009/9bb2db60-72e5-4e73-afa1-551f1048f103">
   
   **2. Even if the topic already exists, the `compactedTopicContext` is not created correctly.**
   
   <img width="600" alt="CleanShot 2023-11-30 at 9 07 18@2x" src="https://github.com/apache/pulsar/assets/54846009/a41b80ac-a8b6-4231-995a-4ee67bc6b100">
   <img width="600" alt="CleanShot 2023-11-30 at 9 07 35@2x" src="https://github.com/apache/pulsar/assets/54846009/80f93cd4-fbcc-4759-b2cf-05e3310b606f">
   
   The problem still seems to be the `compactedTopicContext`, The `compactedLedger` hasn't been created yet
   (`org.apache.pulsar.compaction.CompactedTopicImpl#openCompactedLedger`).
   
   ```java
   PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic, false);
   ```
   
   https://github.com/apache/pulsar/blob/3377003dbe0f53b84298c5c26acbacd2b07c30e2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L2455-L2469
   
   **3. The error you sent before should be this issue. Does the consumer with the subscription name `__compaction` not reconnect?**
   
   https://github.com/apache/pulsar/issues/21536
   
   ![CleanShot 2023-11-30 at 9 41 32@2x](https://github.com/apache/pulsar/assets/54846009/9d86b83b-2216-4654-8627-b3f285cd9547)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][test] Fix the bug caused by unload topic and compaction task failure after task is triggered [pulsar]

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1835809319

   ## [Codecov](https://app.codecov.io/gh/apache/pulsar/pull/21634?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   > Merging [#21634](https://app.codecov.io/gh/apache/pulsar/pull/21634?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (5ae340f) into [master](https://app.codecov.io/gh/apache/pulsar/commit/e1d06b5f54f08c09debab7a9a513b7c173c1779b?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (e1d06b5) will **increase** coverage by `36.49%`.
   > Report is 12 commits behind head on master.
   > The diff coverage is `n/a`.
   
   <details><summary>Additional details and impacted files</summary>
   
   
   [![Impacted file tree graph](https://app.codecov.io/gh/apache/pulsar/pull/21634/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/21634?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #21634       +/-   ##
   =============================================
   + Coverage     36.83%   73.33%   +36.49%     
   - Complexity      376    32740    +32364     
   =============================================
     Files          1715     1893      +178     
     Lines        131097   140730     +9633     
     Branches      14318    15500     +1182     
   =============================================
   + Hits          48288   103201    +54913     
   + Misses        76424    29421    -47003     
   - Partials       6385     8108     +1723     
   ```
   
   | [Flag](https://app.codecov.io/gh/apache/pulsar/pull/21634/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [inttests](https://app.codecov.io/gh/apache/pulsar/pull/21634/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `24.15% <ø> (-0.09%)` | :arrow_down: |
   | [systests](https://app.codecov.io/gh/apache/pulsar/pull/21634/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `24.66% <ø> (-0.26%)` | :arrow_down: |
   | [unittests](https://app.codecov.io/gh/apache/pulsar/pull/21634/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `72.66% <ø> (+40.79%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   [see 1460 files with indirect coverage changes](https://app.codecov.io/gh/apache/pulsar/pull/21634/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]

Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1833918250

   @Denovo1998 
   > 1. Call persistentTopic.compactionStatus() after unload topic, longRunningProcessStatus always NOT_RUN
   
   Because after unloading, `persistentTopic.compactionStatus()` will be reset, so we cannot get the running status of the previous task through it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]

Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1833937914

   > Even if the topic already exists, the compactedTopicContext is not created correctly.
   
   Maybe the compaction task isn't finished yet? `CompactedTopicContext` is created only after the compaction task is completed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1833158125

   > @Denovo1998 After looking at the code, I found that `getCompactedTopicContext` method will wait for compactedTopicContext to be created, so we don’t seem to need to wait for `compactedLedger ` to be created in the test. I think we just need to wait for topic initialization to complete.
   > 
   > 
   
   After the topic initialization is complete, we also wait for the CompactedTopicContext to be created. Because the creation of CompactedTopicContext requires the creation of compactedLedger, otherwise ledgerId will be -1, and an error will be reported if a ledger is not created after 10s. here
   
   https://github.com/apache/pulsar/blob/4e7bff7f1d2a5fb5ef04c1e6428178675474ad39/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java#L298-L304
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]

Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1833205492

   ```java
   2023-11-30T05:30:45,771 - WARN  - [broker-client-shared-internal-executor-4522-1:PersistentTopic@3397] - [persistent://my-property/use/my-ns/testCompactionDuplicate] Compaction failure.
   java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$ConnectException: Consumer connect fail! consumer state:Connecting
   	at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:368) ~[?:?]
   	at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:377) ~[?:?]
   	at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1152) ~[?:?]
   	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
   	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[?:?]
   	at org.apache.pulsar.compaction.TwoPhaseCompactor.lambda$closeLedger$20(TwoPhaseCompactor.java:388) ~[classes/:?]
   	at org.apache.bookkeeper.client.PulsarMockLedgerHandle.lambda$asyncClose$1(PulsarMockLedgerHandle.java:96) ~[testmocks-3.2.0-SNAPSHOT.jar:4.16.3]
   	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
   	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[?:?]
   	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) ~[?:?]
   	at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:137) ~[bookkeeper-common-4.16.3.jar:4.16.3]
   	at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:107) ~[bookkeeper-common-4.16.3.jar:4.16.3]
   	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
   	at java.lang.Thread.run(Thread.java:840) ~[?:?]
   Caused by: org.apache.pulsar.client.api.PulsarClientException$ConnectException: Consumer connect fail! consumer state:Connecting
   	at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.doImmediateAck(PersistentAcknowledgmentsGroupingTracker.java:363) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
   	at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.doCumulativeAck(PersistentAcknowledgmentsGroupingTracker.java:310) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
   	at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.addAcknowledgment(PersistentAcknowledgmentsGroupingTracker.java:243) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
   	at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.addAcknowledgment(PersistentAcknowledgmentsGroupingTracker.java:198) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
   	at org.apache.pulsar.client.impl.ConsumerImpl.doAcknowledge(ConsumerImpl.java:566) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
   	at org.apache.pulsar.client.impl.ConsumerBase.doAcknowledgeWithTxn(ConsumerBase.java:688) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
   	at org.apache.pulsar.client.impl.RawReaderImpl.acknowledgeCumulativeAsync(RawReaderImpl.java:90) ~[classes/:3.2.0-SNAPSHOT]
   	at org.apache.pulsar.compaction.TwoPhaseCompactor.lambda$phaseTwoSeekThenLoop$10(TwoPhaseCompactor.java:225) ~[classes/:?]
   	at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150) ~[?:?]
   ```
   
   @Denovo1998 I found the above error logs from the CI report. I think this is the root cause. I think we can trigger compaction again after topic unload.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]

Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- commented on code in PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#discussion_r1410150884


##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -1970,6 +1970,17 @@ public void testCompactionDuplicate() throws Exception {
         // Unload topic make reader of compaction reconnect
         admin.topics().unload(topic);
 
+        Optional<Topic> topicRef = pulsar.getBrokerService().getTopicIfExists(topic).get();
+        Assert.assertTrue(topicRef.isPresent());
+        PersistentTopic persistentTopic = (PersistentTopic) topicRef.get();

Review Comment:
   Better to move this in the `Awaitility`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#issuecomment-1834330902

   > we also need to fix it in StrategicCompactionTest.testCompactionDuplicate. just retrigger compaction if the previous task failed after topic was unloaded.
   
   @coderzc Do you mean to create a new test case in `org.apache.pulsar.compaction.StrategicCompactionTest`?
   
   **All has been changed. Please check again.**
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]

Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on code in PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#discussion_r1411583148


##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -1967,9 +1967,35 @@ public void testCompactionDuplicate() throws Exception {
         // Wait for phase one to complete
         Thread.sleep(500);
 
+        Optional<Topic> previousTopicRef = pulsar.getBrokerService().getTopicIfExists(topic).get();
+        Assert.assertTrue(previousTopicRef.isPresent());
+        PersistentTopic previousPersistentTopic = (PersistentTopic) previousTopicRef.get();
+
         // Unload topic make reader of compaction reconnect
         admin.topics().unload(topic);
 
+        Awaitility.await().untilAsserted(() -> {
+            LongRunningProcessStatus previousLongRunningProcessStatus = previousPersistentTopic.compactionStatus();
+
+            Optional<Topic> currentTopicReference = pulsar.getBrokerService().getTopicReference(topic);
+            Assert.assertTrue(currentTopicReference.isPresent());
+            PersistentTopic currentPersistentTopic = (PersistentTopic) currentTopicReference.get();
+            LongRunningProcessStatus currentLongRunningProcessStatus = currentPersistentTopic.compactionStatus();
+
+            if (previousLongRunningProcessStatus.status == LongRunningProcessStatus.Status.ERROR
+                    && (currentLongRunningProcessStatus.status == LongRunningProcessStatus.Status.NOT_RUN
+                    || currentLongRunningProcessStatus.status == LongRunningProcessStatus.Status.ERROR)) {
+                // trigger compaction again
+                admin.topics().triggerCompaction(topic);
+                Assert.assertEquals(currentLongRunningProcessStatus.status, LongRunningProcessStatus.Status.SUCCESS);
+            } else if (previousLongRunningProcessStatus.status == LongRunningProcessStatus.Status.RUNNING
+                    || previousLongRunningProcessStatus.status == LongRunningProcessStatus.Status.SUCCESS) {
+                Assert.assertEquals(previousLongRunningProcessStatus.status, LongRunningProcessStatus.Status.SUCCESS);

Review Comment:
   This assertion always seems to be false.



##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -1967,9 +1967,35 @@ public void testCompactionDuplicate() throws Exception {
         // Wait for phase one to complete
         Thread.sleep(500);
 
+        Optional<Topic> previousTopicRef = pulsar.getBrokerService().getTopicIfExists(topic).get();
+        Assert.assertTrue(previousTopicRef.isPresent());
+        PersistentTopic previousPersistentTopic = (PersistentTopic) previousTopicRef.get();
+
         // Unload topic make reader of compaction reconnect
         admin.topics().unload(topic);
 
+        Awaitility.await().untilAsserted(() -> {
+            LongRunningProcessStatus previousLongRunningProcessStatus = previousPersistentTopic.compactionStatus();
+
+            Optional<Topic> currentTopicReference = pulsar.getBrokerService().getTopicReference(topic);
+            Assert.assertTrue(currentTopicReference.isPresent());
+            PersistentTopic currentPersistentTopic = (PersistentTopic) currentTopicReference.get();
+            LongRunningProcessStatus currentLongRunningProcessStatus = currentPersistentTopic.compactionStatus();
+
+            if (previousLongRunningProcessStatus.status == LongRunningProcessStatus.Status.ERROR
+                    && (currentLongRunningProcessStatus.status == LongRunningProcessStatus.Status.NOT_RUN
+                    || currentLongRunningProcessStatus.status == LongRunningProcessStatus.Status.ERROR)) {
+                // trigger compaction again
+                admin.topics().triggerCompaction(topic);
+                Assert.assertEquals(currentLongRunningProcessStatus.status, LongRunningProcessStatus.Status.SUCCESS);

Review Comment:
   I think we can just return here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [pulsar]

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on code in PR #21634:
URL: https://github.com/apache/pulsar/pull/21634#discussion_r1411596166


##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -1967,9 +1967,35 @@ public void testCompactionDuplicate() throws Exception {
         // Wait for phase one to complete
         Thread.sleep(500);
 
+        Optional<Topic> previousTopicRef = pulsar.getBrokerService().getTopicIfExists(topic).get();
+        Assert.assertTrue(previousTopicRef.isPresent());
+        PersistentTopic previousPersistentTopic = (PersistentTopic) previousTopicRef.get();
+
         // Unload topic make reader of compaction reconnect
         admin.topics().unload(topic);
 
+        Awaitility.await().untilAsserted(() -> {
+            LongRunningProcessStatus previousLongRunningProcessStatus = previousPersistentTopic.compactionStatus();
+
+            Optional<Topic> currentTopicReference = pulsar.getBrokerService().getTopicReference(topic);
+            Assert.assertTrue(currentTopicReference.isPresent());
+            PersistentTopic currentPersistentTopic = (PersistentTopic) currentTopicReference.get();
+            LongRunningProcessStatus currentLongRunningProcessStatus = currentPersistentTopic.compactionStatus();
+
+            if (previousLongRunningProcessStatus.status == LongRunningProcessStatus.Status.ERROR
+                    && (currentLongRunningProcessStatus.status == LongRunningProcessStatus.Status.NOT_RUN
+                    || currentLongRunningProcessStatus.status == LongRunningProcessStatus.Status.ERROR)) {
+                // trigger compaction again
+                admin.topics().triggerCompaction(topic);
+                Assert.assertEquals(currentLongRunningProcessStatus.status, LongRunningProcessStatus.Status.SUCCESS);

Review Comment:
   You are right, there are two kinds of situations:
   
   ```java
   1. previous task error, and then trigger compaction again
   (previousLongRunningProcessStatus:ERROR, currentLongRunningProcessStatus:NOT_RUN -> RUNNING -> SUCCESS)
   3. previous task success
   (previousLongRunningProcessStatus:RUNNING -> SUCCESS, currentLongRunningProcessStatus:NOT_RUN)
   ```
   
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org