You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/12/19 22:01:47 UTC

[GitHub] [pulsar] nicoloboschi opened a new pull request, #18989: [fix][broker] Fix deadlock in PendingAckHandleImpl

nicoloboschi opened a new pull request, #18989:
URL: https://github.com/apache/pulsar/pull/18989

   Fixes #18988
   
   ### Motivation
   
   Based on stacktrace in the issue, in `PendingAckHandleImpl`, if the first time `pendingAckHandleCompletableFuture` is completed via the method `addConsumer` the `PendingAckHandleImpl` object is locked to the current thread because the method `completeHandleFuture` is synchronized. The `addConsumer` then locks the `PersistentSubscription`. In the reverse order the subscription closing procedure locks the `PersistentSubscription` object and then `PendingAckHandleImpl`. 
   It's possible that if they run concurrently it ends up in a deadlock 
   
   ### Modifications
   
   * Remove synchronized from `completeHandleFuture` method and synchronize only the future. In this way the whole `PendingAckHandleImpl` object isn't locked in the stages chain.
   
   
   I haven't added tests since I wasn't able to reproduce it programmatically.
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #18989: [fix][broker] Fix deadlock in PendingAckHandleImpl

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #18989:
URL: https://github.com/apache/pulsar/pull/18989#discussion_r1052685049


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java:
##########
@@ -937,19 +937,24 @@ public TransactionPendingAckStats getStats(boolean lowWaterMarks) {
         return transactionPendingAckStats;
     }
 
-    public synchronized void completeHandleFuture() {
-        if (!this.pendingAckHandleCompletableFuture.isDone()) {
-            this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this);
-        }
-        if (recoverTime.getRecoverStartTime() != 0L) {
-            recoverTime.setRecoverEndTime(System.currentTimeMillis());
+    public void completeHandleFuture() {
+        synchronized (this.pendingAckHandleCompletableFuture) {

Review Comment:
   Why are you using synchronized on this variable ?
   This smells a little bit (synchronize over a CompletableFuture)
   And also that reference escapes this object because there is a getter, so I don't think we are going to properly deal with concurrency here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] dlg99 commented on a diff in pull request #18989: [fix][broker] Fix deadlock in PendingAckHandleImpl

Posted by GitBox <gi...@apache.org>.
dlg99 commented on code in PR #18989:
URL: https://github.com/apache/pulsar/pull/18989#discussion_r1052703052


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java:
##########
@@ -937,19 +937,24 @@ public TransactionPendingAckStats getStats(boolean lowWaterMarks) {
         return transactionPendingAckStats;
     }
 
-    public synchronized void completeHandleFuture() {
-        if (!this.pendingAckHandleCompletableFuture.isDone()) {
-            this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this);
-        }
-        if (recoverTime.getRecoverStartTime() != 0L) {
-            recoverTime.setRecoverEndTime(System.currentTimeMillis());
+    public void completeHandleFuture() {
+        synchronized (this.pendingAckHandleCompletableFuture) {

Review Comment:
   do we need this at all (including `if (!this.pendingAckHandleCompletableFuture.isDone())` check)?
   CompleteableFuture can be completed only once (API contract + implementation confirms this), if needed one can use return value of complete() to check if it succeeded



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on a diff in pull request #18989: [fix][broker] Fix deadlock in PendingAckHandleImpl

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on code in PR #18989:
URL: https://github.com/apache/pulsar/pull/18989#discussion_r1052708426


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java:
##########
@@ -937,19 +937,24 @@ public TransactionPendingAckStats getStats(boolean lowWaterMarks) {
         return transactionPendingAckStats;
     }
 
-    public synchronized void completeHandleFuture() {
-        if (!this.pendingAckHandleCompletableFuture.isDone()) {
-            this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this);
-        }
-        if (recoverTime.getRecoverStartTime() != 0L) {
-            recoverTime.setRecoverEndTime(System.currentTimeMillis());
+    public void completeHandleFuture() {
+        synchronized (this.pendingAckHandleCompletableFuture) {

Review Comment:
   updated, thanks



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java:
##########
@@ -937,7 +937,7 @@ public TransactionPendingAckStats getStats(boolean lowWaterMarks) {
         return transactionPendingAckStats;
     }
 
-    public synchronized void completeHandleFuture() {
+    public void completeHandleFuture() {
         if (!this.pendingAckHandleCompletableFuture.isDone()) {
             this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this);
         }

Review Comment:
   updated, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi merged pull request #18989: [fix][broker] Fix deadlock in PendingAckHandleImpl

Posted by GitBox <gi...@apache.org>.
nicoloboschi merged PR #18989:
URL: https://github.com/apache/pulsar/pull/18989


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on a diff in pull request #18989: [fix][broker] Fix deadlock in PendingAckHandleImpl

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on code in PR #18989:
URL: https://github.com/apache/pulsar/pull/18989#discussion_r1053638072


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java:
##########
@@ -155,13 +155,13 @@ public PendingAckHandleImpl(PersistentSubscription persistentSubscription) {
                         .getBrokerService().getPulsar().getTransactionPendingAckStoreProvider();
 
         pendingAckStoreProvider.checkInitializedBefore(persistentSubscription)
-                .thenAccept(init -> {
+                .thenAcceptAsync(init -> {
                     if (init) {
                         initPendingAckStore();
                     } else {
                         completeHandleFuture();
                     }
-                })
+                }, internalPinnedExecutor)

Review Comment:
   Thanks for the explanation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on a diff in pull request #18989: [fix][broker] Fix deadlock in PendingAckHandleImpl

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on code in PR #18989:
URL: https://github.com/apache/pulsar/pull/18989#discussion_r1053032593


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java:
##########
@@ -937,18 +937,16 @@ public TransactionPendingAckStats getStats(boolean lowWaterMarks) {
         return transactionPendingAckStats;
     }
 
-    public synchronized void completeHandleFuture() {
-        if (!this.pendingAckHandleCompletableFuture.isDone()) {
-            this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this);
-        }
-        if (recoverTime.getRecoverStartTime() != 0L) {
+    public void completeHandleFuture() {
+        this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this);
+        if (recoverTime.getRecoverStartTime() != 0L && recoverTime.getRecoverEndTime() == 0L) {
             recoverTime.setRecoverEndTime(System.currentTimeMillis());

Review Comment:
   You're totally right. This `recoveryTime` registers the time of the replay. It should be handled directly in the callback after the replay is completed. For the current usage, being non thread-safe is fine and I'd prefer leave it as is. 
   Those methods don't need synchronization at all since, at the moment, there's no data race for them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codecov-commenter commented on pull request #18989: [fix][broker] Fix deadlock in PendingAckHandleImpl

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #18989:
URL: https://github.com/apache/pulsar/pull/18989#issuecomment-1359042062

   # [Codecov](https://codecov.io/gh/apache/pulsar/pull/18989?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#18989](https://codecov.io/gh/apache/pulsar/pull/18989?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (d83daac) into [master](https://codecov.io/gh/apache/pulsar/commit/feb3cb4d7a484a284e06474713870609b220abfc?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (feb3cb4) will **increase** coverage by `1.25%`.
   > The diff coverage is `57.14%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pulsar/pull/18989/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pulsar/pull/18989?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #18989      +/-   ##
   ============================================
   + Coverage     46.35%   47.60%   +1.25%     
   - Complexity     8939     9465     +526     
   ============================================
     Files           597      626      +29     
     Lines         56858    59236    +2378     
     Branches       5905     6154     +249     
   ============================================
   + Hits          26357    28201    +1844     
   - Misses        27616    28019     +403     
   - Partials       2885     3016     +131     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests | `47.60% <57.14%> (+1.25%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pulsar/pull/18989?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...nsaction/pendingack/impl/PendingAckHandleImpl.java](https://codecov.io/gh/apache/pulsar/pull/18989/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci90cmFuc2FjdGlvbi9wZW5kaW5nYWNrL2ltcGwvUGVuZGluZ0Fja0hhbmRsZUltcGwuamF2YQ==) | `51.15% <57.14%> (-0.63%)` | :arrow_down: |
   | [...g/apache/pulsar/client/impl/ConnectionHandler.java](https://codecov.io/gh/apache/pulsar/pull/18989/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL0Nvbm5lY3Rpb25IYW5kbGVyLmphdmE=) | `50.00% <0.00%> (-5.32%)` | :arrow_down: |
   | [...ction/buffer/impl/TransactionBufferClientImpl.java](https://codecov.io/gh/apache/pulsar/pull/18989/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci90cmFuc2FjdGlvbi9idWZmZXIvaW1wbC9UcmFuc2FjdGlvbkJ1ZmZlckNsaWVudEltcGwuamF2YQ==) | `76.74% <0.00%> (-4.66%)` | :arrow_down: |
   | [.../buffer/impl/TransactionBufferClientStatsImpl.java](https://codecov.io/gh/apache/pulsar/pull/18989/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci90cmFuc2FjdGlvbi9idWZmZXIvaW1wbC9UcmFuc2FjdGlvbkJ1ZmZlckNsaWVudFN0YXRzSW1wbC5qYXZh) | `82.00% <0.00%> (-4.00%)` | :arrow_down: |
   | [...pulsar/broker/TransactionMetadataStoreService.java](https://codecov.io/gh/apache/pulsar/pull/18989/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9UcmFuc2FjdGlvbk1ldGFkYXRhU3RvcmVTZXJ2aWNlLmphdmE=) | `58.51% <0.00%> (-3.94%)` | :arrow_down: |
   | [...tion/buffer/impl/TransactionBufferHandlerImpl.java](https://codecov.io/gh/apache/pulsar/pull/18989/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci90cmFuc2FjdGlvbi9idWZmZXIvaW1wbC9UcmFuc2FjdGlvbkJ1ZmZlckhhbmRsZXJJbXBsLmphdmE=) | `50.00% <0.00%> (-2.54%)` | :arrow_down: |
   | [...a/org/apache/pulsar/client/impl/RawReaderImpl.java](https://codecov.io/gh/apache/pulsar/pull/18989/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL1Jhd1JlYWRlckltcGwuamF2YQ==) | `83.90% <0.00%> (-1.15%)` | :arrow_down: |
   | [.../org/apache/pulsar/client/impl/ConnectionPool.java](https://codecov.io/gh/apache/pulsar/pull/18989/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL0Nvbm5lY3Rpb25Qb29sLmphdmE=) | `37.43% <0.00%> (-1.03%)` | :arrow_down: |
   | [...roker/loadbalance/impl/ModularLoadManagerImpl.java](https://codecov.io/gh/apache/pulsar/pull/18989/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9sb2FkYmFsYW5jZS9pbXBsL01vZHVsYXJMb2FkTWFuYWdlckltcGwuamF2YQ==) | `67.78% <0.00%> (-0.51%)` | :arrow_down: |
   | [...rg/apache/pulsar/broker/service/AbstractTopic.java](https://codecov.io/gh/apache/pulsar/pull/18989/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL0Fic3RyYWN0VG9waWMuamF2YQ==) | `64.76% <0.00%> (-0.45%)` | :arrow_down: |
   | ... and [73 more](https://codecov.io/gh/apache/pulsar/pull/18989/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on a diff in pull request #18989: [fix][broker] Fix deadlock in PendingAckHandleImpl

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on code in PR #18989:
URL: https://github.com/apache/pulsar/pull/18989#discussion_r1052696221


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java:
##########
@@ -937,19 +937,24 @@ public TransactionPendingAckStats getStats(boolean lowWaterMarks) {
         return transactionPendingAckStats;
     }
 
-    public synchronized void completeHandleFuture() {
-        if (!this.pendingAckHandleCompletableFuture.isDone()) {
-            this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this);
-        }
-        if (recoverTime.getRecoverStartTime() != 0L) {
-            recoverTime.setRecoverEndTime(System.currentTimeMillis());
+    public void completeHandleFuture() {
+        synchronized (this.pendingAckHandleCompletableFuture) {

Review Comment:
   the getter escapes on purpose but I agree we don't need the synchronized at all here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] dlg99 commented on a diff in pull request #18989: [fix][broker] Fix deadlock in PendingAckHandleImpl

Posted by GitBox <gi...@apache.org>.
dlg99 commented on code in PR #18989:
URL: https://github.com/apache/pulsar/pull/18989#discussion_r1052704024


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java:
##########
@@ -937,7 +937,7 @@ public TransactionPendingAckStats getStats(boolean lowWaterMarks) {
         return transactionPendingAckStats;
     }
 
-    public synchronized void completeHandleFuture() {
+    public void completeHandleFuture() {
         if (!this.pendingAckHandleCompletableFuture.isDone()) {
             this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this);
         }

Review Comment:
   do we need this check?
   CompleteableFuture can be completed only once (API contract + implementation confirms this), if needed one can use return value of complete() to check if it succeeded



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on a diff in pull request #18989: [fix][broker] Fix deadlock in PendingAckHandleImpl

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on code in PR #18989:
URL: https://github.com/apache/pulsar/pull/18989#discussion_r1052956640


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java:
##########
@@ -937,18 +937,16 @@ public TransactionPendingAckStats getStats(boolean lowWaterMarks) {
         return transactionPendingAckStats;
     }
 
-    public synchronized void completeHandleFuture() {
-        if (!this.pendingAckHandleCompletableFuture.isDone()) {
-            this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this);
-        }
-        if (recoverTime.getRecoverStartTime() != 0L) {
+    public void completeHandleFuture() {
+        this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this);
+        if (recoverTime.getRecoverStartTime() != 0L && recoverTime.getRecoverEndTime() == 0L) {
             recoverTime.setRecoverEndTime(System.currentTimeMillis());

Review Comment:
   Without the `synchronized` keyword in the method declaration, this conditional update to `recoveredEndTime` is subject to data races, and the variables will not be safely published to other threads. However, at this time, the `recoverTime` object is only used for stats, and we have a tendency to have relaxed requirements for stats. Perhaps it is fine. We could alternatively add methods to the class that ensure proper synchronization of updates. I don't feel strongly, except to mention that it is somewhat brittle to assume the variable will only ever be used by stats.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java:
##########
@@ -155,13 +155,13 @@ public PendingAckHandleImpl(PersistentSubscription persistentSubscription) {
                         .getBrokerService().getPulsar().getTransactionPendingAckStoreProvider();
 
         pendingAckStoreProvider.checkInitializedBefore(persistentSubscription)
-                .thenAccept(init -> {
+                .thenAcceptAsync(init -> {
                     if (init) {
                         initPendingAckStore();
                     } else {
                         completeHandleFuture();
                     }
-                })
+                }, internalPinnedExecutor)

Review Comment:
   Out of curiosity, how did you select this thread to run the task?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on a diff in pull request #18989: [fix][broker] Fix deadlock in PendingAckHandleImpl

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on code in PR #18989:
URL: https://github.com/apache/pulsar/pull/18989#discussion_r1053032593


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java:
##########
@@ -937,18 +937,16 @@ public TransactionPendingAckStats getStats(boolean lowWaterMarks) {
         return transactionPendingAckStats;
     }
 
-    public synchronized void completeHandleFuture() {
-        if (!this.pendingAckHandleCompletableFuture.isDone()) {
-            this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this);
-        }
-        if (recoverTime.getRecoverStartTime() != 0L) {
+    public void completeHandleFuture() {
+        this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this);
+        if (recoverTime.getRecoverStartTime() != 0L && recoverTime.getRecoverEndTime() == 0L) {
             recoverTime.setRecoverEndTime(System.currentTimeMillis());

Review Comment:
   You're totally right. This `recoveryTime` registers the time of the replay. It should be handled directly in the callback after the replay is completed. For the current usage, being non thread-safe is fine and I'd prefer leave it as is. 
   Those methods doesn't need synchronization at all since, at the moment, there's no data race for them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org