You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "poorbarcode (via GitHub)" <gi...@apache.org> on 2023/11/08 08:48:09 UTC

[PR] [fix] [ml] Fix orphan task for ledger create timeout check [pulsar]

poorbarcode opened a new pull request, #21542:
URL: https://github.com/apache/pulsar/pull/21542

   ### Motivation
   
   When an ML tries to create a new ledger, it will create a delay task to check if the ledger create request is timeout<sup>[1]</sup>.
   
   However, we should cancel this delay task after the request to create new ledgers is finished. Otherwise, these tasks will cost unnecessary CPU resources<sup>[2]</sup>.
   
   **[1]**: https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L4070-L4082
   
   
   ```java
   bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerCreated, finalMetadata);
   
   scheduledExecutor.schedule(() -> {
       if (!ledgerCreated.get()) {
           cb.createComplete(BKException.Code.TimeoutException, null, ledgerCreated);
       }
   }, config.getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS);
   
   ```
   
   **[2]**: Thousands delay tasks in memory 
   
   <img width="1450" alt="for_pr_1" src="https://github.com/apache/pulsar/assets/25195800/bd287044-6b02-4128-b9ac-2f0548b0a7cd">
   <img width="1447" alt="for_pr_2" src="https://github.com/apache/pulsar/assets/25195800/812c7071-ccce-4869-ba30-934d65997cc2">
   
   
   
   ### Modifications
   
   <!-- Describe the modifications you've done. -->
   
   
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   
   ### Matching PR in forked repository
   
   PR in forked repository: x
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [ml] Fix orphan scheduled task for ledger create timeout check [pulsar]

Posted by "codelipenghui (via GitHub)" <gi...@apache.org>.
codelipenghui commented on code in PR #21542:
URL: https://github.com/apache/pulsar/pull/21542#discussion_r1389058313


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -4052,33 +4051,40 @@ protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig conf
                 ));
             } catch (EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException e) {
                 log.error("[{}] Serialize the placement configuration failed", name, e);
-                cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
+                cb.createComplete(Code.UnexpectedConditionException, null, ledgerFutureHook);
                 return;
             }
         }
         createdLedgerCustomMetadata = finalMetadata;
-
         try {
             bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(),
-                    config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerCreated, finalMetadata);
+                    config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerFutureHook, finalMetadata);
         } catch (Throwable cause) {
             log.error("[{}] Encountered unexpected error when creating ledger",
                 name, cause);
-            cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
+            ledgerFutureHook.completeExceptionally(cause);
+            cb.createComplete(Code.UnexpectedConditionException, null, ledgerFutureHook);
             return;
         }
-        scheduledExecutor.schedule(() -> {
-            if (!ledgerCreated.get()) {
+
+        ScheduledFuture timeoutChecker = scheduledExecutor.schedule(() -> {
+            if (ledgerFutureHook.completeExceptionally(new TimeoutException(name + " Create ledger timeout"))) {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Timeout creating ledger", name);
                 }
-                cb.createComplete(BKException.Code.TimeoutException, null, ledgerCreated);
+                cb.createComplete(BKException.Code.TimeoutException, null, ledgerFutureHook);
             } else {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Ledger already created when timeout task is triggered", name);
                 }
             }
         }, config.getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS);
+
+        ledgerFutureHook.whenComplete((ignore, ex) -> {
+            if (!timeoutChecker.isDone()) {

Review Comment:
   From the API description
   
   ```
   /**
        * Attempts to cancel execution of this task.  This method has no
        * effect if the task is already completed or cancelled, or could
        * not be cancelled for some other reason.  Otherwise, if this
        * task has not started when {@code cancel} is called, this task
        * should never run.  If the task has already started, then the
        * {@code mayInterruptIfRunning} parameter determines whether the
        * thread executing this task (when known by the implementation)
        * is interrupted in an attempt to stop the task.
   ```
   
   It looks like we don't need this check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [ml] Fix orphan scheduled task for ledger create timeout check [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #21542:
URL: https://github.com/apache/pulsar/pull/21542#discussion_r1390134908


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -4052,33 +4051,39 @@ protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig conf
                 ));
             } catch (EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException e) {
                 log.error("[{}] Serialize the placement configuration failed", name, e);
-                cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
+                cb.createComplete(Code.UnexpectedConditionException, null, ledgerFutureHook);
                 return;
             }
         }
         createdLedgerCustomMetadata = finalMetadata;
-
         try {
             bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(),
-                    config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerCreated, finalMetadata);
+                    config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerFutureHook, finalMetadata);
         } catch (Throwable cause) {
             log.error("[{}] Encountered unexpected error when creating ledger",
                 name, cause);
-            cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
+            ledgerFutureHook.completeExceptionally(cause);
+            cb.createComplete(Code.UnexpectedConditionException, null, ledgerFutureHook);

Review Comment:
   > I am curious why method bookKeeper.asyncCreateLedger will throw an exception but not call the callback. How does the caller know whether a callback is called or not?
   
   I think the `try{}catch{}` here is just to ensure that all the situations are handled correctly. Since `bookKeeper.asyncCreateLedger` will try to get a lock, maybe an `InterruptedException` will be through?
   
   > Because it has a risk of calling the callback method twice.
   
   There are 3 places that will call `cb. createComplete `
   - Catch an Ex here
   - Create a ledger successfully
   - Timeout
   
   If we get an Ex here, it means something wrong before switching a thread to actually create the ledger, so the ledger will not be created successfully.
   
   If we get an Ex here, it means the timeout checker will not be created.
   
   So there is no risk of calling the callback method twice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [ml] Fix orphan scheduled task for ledger create timeout check [pulsar]

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #21542:
URL: https://github.com/apache/pulsar/pull/21542#discussion_r1390081483


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -4052,33 +4051,39 @@ protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig conf
                 ));
             } catch (EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException e) {
                 log.error("[{}] Serialize the placement configuration failed", name, e);
-                cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
+                cb.createComplete(Code.UnexpectedConditionException, null, ledgerFutureHook);
                 return;
             }
         }
         createdLedgerCustomMetadata = finalMetadata;
-
         try {
             bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(),
-                    config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerCreated, finalMetadata);
+                    config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerFutureHook, finalMetadata);
         } catch (Throwable cause) {
             log.error("[{}] Encountered unexpected error when creating ledger",
                 name, cause);
-            cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
+            ledgerFutureHook.completeExceptionally(cause);
+            cb.createComplete(Code.UnexpectedConditionException, null, ledgerFutureHook);
             return;
         }
-        scheduledExecutor.schedule(() -> {
-            if (!ledgerCreated.get()) {
+
+        ScheduledFuture timeoutChecker = scheduledExecutor.schedule(() -> {
+            if (!ledgerFutureHook.isDone()
+                    && ledgerFutureHook.completeExceptionally(new TimeoutException(name + " Create ledger timeout"))) {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Timeout creating ledger", name);

Review Comment:
   I am wondering if it should be a warning log. :) WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [ml] Fix orphan scheduled task for ledger create timeout check [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #21542:
URL: https://github.com/apache/pulsar/pull/21542#discussion_r1390129481


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -4052,33 +4051,39 @@ protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig conf
                 ));
             } catch (EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException e) {
                 log.error("[{}] Serialize the placement configuration failed", name, e);
-                cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
+                cb.createComplete(Code.UnexpectedConditionException, null, ledgerFutureHook);
                 return;
             }
         }
         createdLedgerCustomMetadata = finalMetadata;
-
         try {
             bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(),
-                    config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerCreated, finalMetadata);
+                    config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerFutureHook, finalMetadata);
         } catch (Throwable cause) {
             log.error("[{}] Encountered unexpected error when creating ledger",
                 name, cause);
-            cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
+            ledgerFutureHook.completeExceptionally(cause);
+            cb.createComplete(Code.UnexpectedConditionException, null, ledgerFutureHook);
             return;
         }
-        scheduledExecutor.schedule(() -> {
-            if (!ledgerCreated.get()) {
+
+        ScheduledFuture timeoutChecker = scheduledExecutor.schedule(() -> {
+            if (!ledgerFutureHook.isDone()
+                    && ledgerFutureHook.completeExceptionally(new TimeoutException(name + " Create ledger timeout"))) {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Timeout creating ledger", name);

Review Comment:
   Since it provides an` Error code` to the caller, the caller will print an error log. see https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L1561



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [ml] Fix orphan scheduled task for ledger create timeout check [pulsar]

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #21542:
URL: https://github.com/apache/pulsar/pull/21542#issuecomment-1806497231

   ## [Codecov](https://app.codecov.io/gh/apache/pulsar/pull/21542?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   > Merging [#21542](https://app.codecov.io/gh/apache/pulsar/pull/21542?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (4c2477c) into [master](https://app.codecov.io/gh/apache/pulsar/commit/44abba9922e5e02099bab79591e2327845772f16?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (44abba9) will **decrease** coverage by `0.07%`.
   > Report is 9 commits behind head on master.
   > The diff coverage is `50.00%`.
   
   [![Impacted file tree graph](https://app.codecov.io/gh/apache/pulsar/pull/21542/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/21542?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #21542      +/-   ##
   ============================================
   - Coverage     73.27%   73.20%   -0.07%     
   + Complexity    32676    32570     -106     
   ============================================
     Files          1892     1892              
     Lines        140632   140720      +88     
     Branches      15467    15479      +12     
   ============================================
   - Hits         103044   103018      -26     
   - Misses        29497    29588      +91     
   - Partials       8091     8114      +23     
   ```
   
   | [Flag](https://app.codecov.io/gh/apache/pulsar/pull/21542/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [inttests](https://app.codecov.io/gh/apache/pulsar/pull/21542/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `24.12% <42.85%> (-0.22%)` | :arrow_down: |
   | [systests](https://app.codecov.io/gh/apache/pulsar/pull/21542/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `24.70% <42.85%> (-0.28%)` | :arrow_down: |
   | [unittests](https://app.codecov.io/gh/apache/pulsar/pull/21542/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `72.50% <50.00%> (-0.05%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Files](https://app.codecov.io/gh/apache/pulsar/pull/21542?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [...che/bookkeeper/mledger/impl/ManagedLedgerImpl.java](https://app.codecov.io/gh/apache/pulsar/pull/21542?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-bWFuYWdlZC1sZWRnZXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2Jvb2trZWVwZXIvbWxlZGdlci9pbXBsL01hbmFnZWRMZWRnZXJJbXBsLmphdmE=) | `80.64% <50.00%> (-0.39%)` | :arrow_down: |
   
   ... and [100 files with indirect coverage changes](https://app.codecov.io/gh/apache/pulsar/pull/21542/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [ml] Fix orphan scheduled task for ledger create timeout check [pulsar]

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #21542:
URL: https://github.com/apache/pulsar/pull/21542#discussion_r1390081988


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -4052,33 +4051,39 @@ protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig conf
                 ));
             } catch (EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException e) {
                 log.error("[{}] Serialize the placement configuration failed", name, e);
-                cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
+                cb.createComplete(Code.UnexpectedConditionException, null, ledgerFutureHook);
                 return;
             }
         }
         createdLedgerCustomMetadata = finalMetadata;
-
         try {
             bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(),
-                    config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerCreated, finalMetadata);
+                    config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerFutureHook, finalMetadata);
         } catch (Throwable cause) {
             log.error("[{}] Encountered unexpected error when creating ledger",
                 name, cause);
-            cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
+            ledgerFutureHook.completeExceptionally(cause);
+            cb.createComplete(Code.UnexpectedConditionException, null, ledgerFutureHook);

Review Comment:
   **Off-topic discussion:** 
   I am curious why method `bookKeeper.asyncCreateLedger` will throw an exception but not call the callback. How does the caller know whether a callback is called or not? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [ml] Fix orphan scheduled task for ledger create timeout check [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #21542:
URL: https://github.com/apache/pulsar/pull/21542#discussion_r1390139587


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -4052,33 +4051,39 @@ protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig conf
                 ));
             } catch (EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException e) {
                 log.error("[{}] Serialize the placement configuration failed", name, e);
-                cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
+                cb.createComplete(Code.UnexpectedConditionException, null, ledgerFutureHook);
                 return;
             }
         }
         createdLedgerCustomMetadata = finalMetadata;
-
         try {
             bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(),
-                    config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerCreated, finalMetadata);
+                    config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerFutureHook, finalMetadata);
         } catch (Throwable cause) {
             log.error("[{}] Encountered unexpected error when creating ledger",
                 name, cause);
-            cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
+            ledgerFutureHook.completeExceptionally(cause);
+            cb.createComplete(Code.UnexpectedConditionException, null, ledgerFutureHook);
             return;
         }
-        scheduledExecutor.schedule(() -> {
-            if (!ledgerCreated.get()) {
+
+        ScheduledFuture timeoutChecker = scheduledExecutor.schedule(() -> {
+            if (!ledgerFutureHook.isDone()
+                    && ledgerFutureHook.completeExceptionally(new TimeoutException(name + " Create ledger timeout"))) {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Timeout creating ledger", name);
                 }
-                cb.createComplete(BKException.Code.TimeoutException, null, ledgerCreated);
+                cb.createComplete(BKException.Code.TimeoutException, null, ledgerFutureHook);
             } else {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Ledger already created when timeout task is triggered", name);
                 }
             }
         }, config.getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS);
+
+        ledgerFutureHook.whenComplete((ignore, ex) -> {

Review Comment:
   After talking with @mattisonchao , mark this comment `Resolved`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [ml] Fix orphan scheduled task for ledger create timeout check [pulsar]

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #21542:
URL: https://github.com/apache/pulsar/pull/21542#discussion_r1390081649


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -4039,7 +4038,7 @@ public static ManagedLedgerException createManagedLedgerException(Throwable t) {
      */
     protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig config, DigestType digestType,
             CreateCallback cb, Map<String, byte[]> metadata) {
-        AtomicBoolean ledgerCreated = new AtomicBoolean(false);
+        CompletableFuture<LedgerHandle> ledgerFutureHook = new CompletableFuture<>();

Review Comment:
   Why use `CompletableFuture<LedgerHandle>` not `CompletableFuture<Void>`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [ml] Fix orphan scheduled task for ledger create timeout check [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #21542:
URL: https://github.com/apache/pulsar/pull/21542#discussion_r1389084574


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -4052,33 +4051,40 @@ protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig conf
                 ));
             } catch (EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException e) {
                 log.error("[{}] Serialize the placement configuration failed", name, e);
-                cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
+                cb.createComplete(Code.UnexpectedConditionException, null, ledgerFutureHook);
                 return;
             }
         }
         createdLedgerCustomMetadata = finalMetadata;
-
         try {
             bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(),
-                    config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerCreated, finalMetadata);
+                    config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerFutureHook, finalMetadata);
         } catch (Throwable cause) {
             log.error("[{}] Encountered unexpected error when creating ledger",
                 name, cause);
-            cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
+            ledgerFutureHook.completeExceptionally(cause);
+            cb.createComplete(Code.UnexpectedConditionException, null, ledgerFutureHook);
             return;
         }
-        scheduledExecutor.schedule(() -> {
-            if (!ledgerCreated.get()) {
+
+        ScheduledFuture timeoutChecker = scheduledExecutor.schedule(() -> {
+            if (ledgerFutureHook.completeExceptionally(new TimeoutException(name + " Create ledger timeout"))) {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Timeout creating ledger", name);
                 }
-                cb.createComplete(BKException.Code.TimeoutException, null, ledgerCreated);
+                cb.createComplete(BKException.Code.TimeoutException, null, ledgerFutureHook);
             } else {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Ledger already created when timeout task is triggered", name);
                 }
             }
         }, config.getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS);
+
+        ledgerFutureHook.whenComplete((ignore, ex) -> {
+            if (!timeoutChecker.isDone()) {

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [ml] Fix orphan scheduled task for ledger create timeout check [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #21542:
URL: https://github.com/apache/pulsar/pull/21542#discussion_r1390133015


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -4052,33 +4051,39 @@ protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig conf
                 ));
             } catch (EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException e) {
                 log.error("[{}] Serialize the placement configuration failed", name, e);
-                cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
+                cb.createComplete(Code.UnexpectedConditionException, null, ledgerFutureHook);
                 return;
             }
         }
         createdLedgerCustomMetadata = finalMetadata;
-
         try {
             bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(),
-                    config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerCreated, finalMetadata);
+                    config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerFutureHook, finalMetadata);
         } catch (Throwable cause) {
             log.error("[{}] Encountered unexpected error when creating ledger",
                 name, cause);
-            cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
+            ledgerFutureHook.completeExceptionally(cause);
+            cb.createComplete(Code.UnexpectedConditionException, null, ledgerFutureHook);
             return;
         }
-        scheduledExecutor.schedule(() -> {
-            if (!ledgerCreated.get()) {
+
+        ScheduledFuture timeoutChecker = scheduledExecutor.schedule(() -> {
+            if (!ledgerFutureHook.isDone()
+                    && ledgerFutureHook.completeExceptionally(new TimeoutException(name + " Create ledger timeout"))) {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Timeout creating ledger", name);
                 }
-                cb.createComplete(BKException.Code.TimeoutException, null, ledgerCreated);
+                cb.createComplete(BKException.Code.TimeoutException, null, ledgerFutureHook);
             } else {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Ledger already created when timeout task is triggered", name);
                 }
             }
         }, config.getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS);
+
+        ledgerFutureHook.whenComplete((ignore, ex) -> {

Review Comment:
   > @mattisonchao : What is the benefit of using CompletableFuture? Because it has an atomic cancel method, why not pass timeoutChecker directly?
   
   Just create a channel to trace more easily.
   
   I could not fully understand this question. Could you explain it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [ml] Fix orphan scheduled task for ledger create timeout check [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #21542:
URL: https://github.com/apache/pulsar/pull/21542#discussion_r1390128868


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -4039,7 +4038,7 @@ public static ManagedLedgerException createManagedLedgerException(Throwable t) {
      */
     protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig config, DigestType digestType,
             CreateCallback cb, Map<String, byte[]> metadata) {
-        AtomicBoolean ledgerCreated = new AtomicBoolean(false);
+        CompletableFuture<LedgerHandle> ledgerFutureHook = new CompletableFuture<>();

Review Comment:
   > Why use CompletableFuture<LedgerHandle> not CompletableFuture<Void>
   
   I think `CompletableFuture<LedgerHandle>` is better because it can let us know what it works for and there is no additional cost.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [ml] Fix orphan scheduled task for ledger create timeout check [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode merged PR #21542:
URL: https://github.com/apache/pulsar/pull/21542


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [ml] Fix orphan scheduled task for ledger create timeout check [pulsar]

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #21542:
URL: https://github.com/apache/pulsar/pull/21542#discussion_r1390081406


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -4052,33 +4051,39 @@ protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig conf
                 ));
             } catch (EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException e) {
                 log.error("[{}] Serialize the placement configuration failed", name, e);
-                cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
+                cb.createComplete(Code.UnexpectedConditionException, null, ledgerFutureHook);
                 return;
             }
         }
         createdLedgerCustomMetadata = finalMetadata;
-
         try {
             bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(),
-                    config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerCreated, finalMetadata);
+                    config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerFutureHook, finalMetadata);
         } catch (Throwable cause) {
             log.error("[{}] Encountered unexpected error when creating ledger",
                 name, cause);
-            cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
+            ledgerFutureHook.completeExceptionally(cause);
+            cb.createComplete(Code.UnexpectedConditionException, null, ledgerFutureHook);
             return;
         }
-        scheduledExecutor.schedule(() -> {
-            if (!ledgerCreated.get()) {
+
+        ScheduledFuture timeoutChecker = scheduledExecutor.schedule(() -> {
+            if (!ledgerFutureHook.isDone()
+                    && ledgerFutureHook.completeExceptionally(new TimeoutException(name + " Create ledger timeout"))) {

Review Comment:
   only `ledgerFutureHook.completeExceptionally(new TimeoutException(name + " Create ledger timeout"))` is enough here. isn't it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [ml] Fix orphan scheduled task for ledger create timeout check [pulsar]

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #21542:
URL: https://github.com/apache/pulsar/pull/21542#discussion_r1390081988


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -4052,33 +4051,39 @@ protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig conf
                 ));
             } catch (EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException e) {
                 log.error("[{}] Serialize the placement configuration failed", name, e);
-                cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
+                cb.createComplete(Code.UnexpectedConditionException, null, ledgerFutureHook);
                 return;
             }
         }
         createdLedgerCustomMetadata = finalMetadata;
-
         try {
             bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(),
-                    config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerCreated, finalMetadata);
+                    config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerFutureHook, finalMetadata);
         } catch (Throwable cause) {
             log.error("[{}] Encountered unexpected error when creating ledger",
                 name, cause);
-            cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
+            ledgerFutureHook.completeExceptionally(cause);
+            cb.createComplete(Code.UnexpectedConditionException, null, ledgerFutureHook);

Review Comment:
   **Off-topic discussion:** 
   I am curious why method `bookKeeper.asyncCreateLedger` will throw an exception but not call the callback. How does the caller know whether a callback is called or not? 
   Because it has a risk of calling the callback method twice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org