You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/04/04 08:15:23 UTC

[GitHub] [pulsar] mattisonchao opened a new pull request, #15021: [improve][transaction] Use a global timeout checker to avoid many timeout tasks.

mattisonchao opened a new pull request, #15021:
URL: https://github.com/apache/pulsar/pull/15021

   ### Motivation
   
   The current operation timeout check is to create a brand new timeout task when we make a request. We can use a global checker to check if the first operation times out and then check subsequent requests to avoid creating many timed out tasks.
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   ### Documentation
    
   - [x] `no-need-doc` 
     
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15021: [improve][transaction] Use a global timeout checker to avoid many timeout tasks.

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15021:
URL: https://github.com/apache/pulsar/pull/15021#discussion_r841756022


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java:
##########
@@ -287,6 +288,25 @@ private void checkPendingRequests() {
         }
     }
 
+    private void recursiveCheckOutstandingRequestIfTimeout() {
+        while (true) {
+            Map.Entry<Long, OpRequestSend> entry = outstandingRequests.firstEntry();
+            if (entry == null) {
+                timer.newTimeout(task -> recursiveCheckOutstandingRequestIfTimeout(), 100, TimeUnit.MILLISECONDS);
+                return;
+            }
+            OpRequestSend op = entry.getValue();
+            if (op.cb.isDone() || (System.currentTimeMillis() - op.createdAt) < operationTimeoutInMills) {
+                timer.newTimeout(task -> recursiveCheckOutstandingRequestIfTimeout(), 100, TimeUnit.MILLISECONDS);
+                return;
+            }
+            outstandingRequests.remove(entry.getKey());
+            op.cb.completeExceptionally(new TransactionBufferClientException
+                    .RequestTimeoutException());
+            onResponse(op);

Review Comment:
   The logic seems not right. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #15021: [improve][transaction] Use a global timeout checker to avoid many timeout tasks.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #15021:
URL: https://github.com/apache/pulsar/pull/15021#discussion_r842255674


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java:
##########
@@ -154,17 +156,16 @@ public void endTxn(OpRequestSend op) {
             if (throwable == null) {
                 if (clientCnx.ctx().channel().isActive()) {
                     clientCnx.registerTransactionBufferHandler(TransactionBufferHandlerImpl.this);
-                    outstandingRequests.put(op.requestId, op);
-                    timer.newTimeout(timeout -> {
-                        OpRequestSend peek = outstandingRequests.remove(op.requestId);
-                        if (peek != null && !peek.cb.isDone() && !peek.cb.isCompletedExceptionally()) {
-                            peek.cb.completeExceptionally(new TransactionBufferClientException
-                                    .RequestTimeoutException());
-                            onResponse(peek);
-                        }
-                    }, operationTimeoutInMills, TimeUnit.MILLISECONDS);
                     op.cmd.retain();
-                    clientCnx.ctx().writeAndFlush(op.cmd, clientCnx.ctx().voidPromise());
+                    clientCnx.ctx().writeAndFlush(op.cmd)
+                            .addListener(result -> {
+                                if (result.isSuccess()) {
+                                    outstandingRequests.put(op.requestId, op);
+                                } else {

Review Comment:
   Yes, you are right, maybe we exist in the condition that is response faster than success callback.
   I need to fix it. thanks ~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on pull request #15021: [improve][transaction] Use a global timeout checker to avoid many timeout tasks.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on PR #15021:
URL: https://github.com/apache/pulsar/pull/15021#issuecomment-1100816452

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on pull request #15021: [improve][transaction] Use a global timeout checker to avoid many timeout tasks.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on PR #15021:
URL: https://github.com/apache/pulsar/pull/15021#issuecomment-1089593846

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on pull request #15021: [improve][transaction] Use a global timeout checker to avoid many timeout tasks.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on PR #15021:
URL: https://github.com/apache/pulsar/pull/15021#issuecomment-1101242733

   After discussion with @congbobo184
   We think this change is more complex than the current implementation, but the profit is not very high.
   So, I chose close this PR and reopen it when needed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on pull request #15021: [improve][transaction] Use a global timeout checker to avoid many timeout tasks.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on PR #15021:
URL: https://github.com/apache/pulsar/pull/15021#issuecomment-1100816420

   Hi, @codelipenghui  @congbobo184 
   
   I change the timeout check logic. PTAL, when you have time.:-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #15021: [improve][transaction] Use a global timeout checker to avoid many timeout tasks.

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #15021:
URL: https://github.com/apache/pulsar/pull/15021#discussion_r851945509


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java:
##########
@@ -222,6 +233,42 @@ public void handleEndTxnOnSubscriptionResponse(long requestId,
         }
     }
 
+    private void continueCheckOutstandingRequestIfTimeout() {
+        Map.Entry<Long, OpRequestSend> entry = outstandingRequests.firstEntry();
+        if (entry == null) {
+            timoutCheckIsRunning.set(false);
+            // To avoid race conditions, the scenarios are as follows.
+            // Thread A: after if(entry=null) => Thread B: put a new entry and then get timoutCheckIsRunning=true =>
+            // Thread A: set timoutCheckIsRunning = false
+            if (outstandingRequests.firstEntry() != null
+                    && timoutCheckIsRunning.compareAndSet(false, true)) {
+                 continueCheckOutstandingRequestIfTimeout();
+            }
+            return;
+        }
+        OpRequestSend op = entry.getValue();
+        final long finalRequestId = op.requestId;
+        if (!op.isTimeout(operationTimeoutInMills)) {
+            timer.newTimeout(task -> continueCheckOutstandingRequestIfTimeout(),
+                    outstandingRequests.firstEntry().getValue()
+                            .getTimeoutMs(operationTimeoutInMills), TimeUnit.MILLISECONDS);
+            return;
+        }
+        if (!op.tryOwnership(finalRequestId)) {
+            // Just safe check to ensure endless loop cause StackOverFlow,
+            // because other operation will remove this request firstly.
+            outstandingRequests.remove(finalRequestId);
+            // This operation already handle by other thread, we need to check next.
+            continueCheckOutstandingRequestIfTimeout();
+            return;
+        }
+        outstandingRequests.remove(entry.getKey());
+        op.cb.completeExceptionally(new TransactionBufferClientException

Review Comment:
   yes, you are right. but we should remove finalRequestId `outstandingRequests.remove(finalRequestId);`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java:
##########
@@ -222,6 +233,42 @@ public void handleEndTxnOnSubscriptionResponse(long requestId,
         }
     }
 
+    private void continueCheckOutstandingRequestIfTimeout() {
+        Map.Entry<Long, OpRequestSend> entry = outstandingRequests.firstEntry();
+        if (entry == null) {
+            timoutCheckIsRunning.set(false);
+            // To avoid race conditions, the scenarios are as follows.
+            // Thread A: after if(entry=null) => Thread B: put a new entry and then get timoutCheckIsRunning=true =>
+            // Thread A: set timoutCheckIsRunning = false
+            if (outstandingRequests.firstEntry() != null
+                    && timoutCheckIsRunning.compareAndSet(false, true)) {
+                 continueCheckOutstandingRequestIfTimeout();
+            }
+            return;
+        }
+        OpRequestSend op = entry.getValue();
+        final long finalRequestId = op.requestId;
+        if (!op.isTimeout(operationTimeoutInMills)) {

Review Comment:
   If this op have been completed by handle response, the timeout time is not correct



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on pull request #15021: [improve][transaction] Use a global timeout checker to avoid many timeout tasks.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on PR #15021:
URL: https://github.com/apache/pulsar/pull/15021#issuecomment-1100815824

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15021: [improve][transaction] Use a global timeout checker to avoid many timeout tasks.

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15021:
URL: https://github.com/apache/pulsar/pull/15021#discussion_r841756022


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java:
##########
@@ -287,6 +288,25 @@ private void checkPendingRequests() {
         }
     }
 
+    private void recursiveCheckOutstandingRequestIfTimeout() {
+        while (true) {
+            Map.Entry<Long, OpRequestSend> entry = outstandingRequests.firstEntry();
+            if (entry == null) {
+                timer.newTimeout(task -> recursiveCheckOutstandingRequestIfTimeout(), 100, TimeUnit.MILLISECONDS);
+                return;
+            }
+            OpRequestSend op = entry.getValue();
+            if (op.cb.isDone() || (System.currentTimeMillis() - op.createdAt) < operationTimeoutInMills) {
+                timer.newTimeout(task -> recursiveCheckOutstandingRequestIfTimeout(), 100, TimeUnit.MILLISECONDS);
+                return;
+            }
+            outstandingRequests.remove(entry.getKey());
+            op.cb.completeExceptionally(new TransactionBufferClientException
+                    .RequestTimeoutException());
+            onResponse(op);

Review Comment:
   The logic seems not right. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15021: [improve][transaction] Use a global timeout checker to avoid many timeout tasks.

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15021:
URL: https://github.com/apache/pulsar/pull/15021#discussion_r841811812


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java:
##########
@@ -154,17 +156,16 @@ public void endTxn(OpRequestSend op) {
             if (throwable == null) {
                 if (clientCnx.ctx().channel().isActive()) {
                     clientCnx.registerTransactionBufferHandler(TransactionBufferHandlerImpl.this);
-                    outstandingRequests.put(op.requestId, op);
-                    timer.newTimeout(timeout -> {
-                        OpRequestSend peek = outstandingRequests.remove(op.requestId);
-                        if (peek != null && !peek.cb.isDone() && !peek.cb.isCompletedExceptionally()) {
-                            peek.cb.completeExceptionally(new TransactionBufferClientException
-                                    .RequestTimeoutException());
-                            onResponse(peek);
-                        }
-                    }, operationTimeoutInMills, TimeUnit.MILLISECONDS);
                     op.cmd.retain();
-                    clientCnx.ctx().writeAndFlush(op.cmd, clientCnx.ctx().voidPromise());
+                    clientCnx.ctx().writeAndFlush(op.cmd)
+                            .addListener(result -> {
+                                if (result.isSuccess()) {
+                                    outstandingRequests.put(op.requestId, op);
+                                } else {

Review Comment:
   `outstandingRequests.put(op.requestId, op)` Should be before writeAndFlush.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on a diff in pull request #15021: [improve][transaction] Use a global timeout checker to avoid many timeout tasks.

Posted by GitBox <gi...@apache.org>.
HQebupt commented on code in PR #15021:
URL: https://github.com/apache/pulsar/pull/15021#discussion_r843397412


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java:
##########
@@ -18,18 +18,22 @@
  */
 package org.apache.pulsar.broker.transaction.buffer;
 
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
+import com.github.dockerjava.zerodep.shaded.org.apache.hc.core5.concurrent.DefaultThreadFactory;

Review Comment:
   Why changed `io.netty.util.concurrent.DefaultThreadFactory`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao closed pull request #15021: [improve][transaction] Use a global timeout checker to avoid many timeout tasks.

Posted by GitBox <gi...@apache.org>.
mattisonchao closed pull request #15021: [improve][transaction] Use a global timeout checker to avoid many timeout tasks.
URL: https://github.com/apache/pulsar/pull/15021


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #15021: [improve][transaction] Use a global timeout checker to avoid many timeout tasks.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #15021:
URL: https://github.com/apache/pulsar/pull/15021#discussion_r851873706


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java:
##########
@@ -222,6 +233,42 @@ public void handleEndTxnOnSubscriptionResponse(long requestId,
         }
     }
 
+    private void continueCheckOutstandingRequestIfTimeout() {
+        Map.Entry<Long, OpRequestSend> entry = outstandingRequests.firstEntry();
+        if (entry == null) {
+            timoutCheckIsRunning.set(false);
+            // To avoid race conditions, the scenarios are as follows.
+            // Thread A: after if(entry=null) => Thread B: put a new entry and then get timoutCheckIsRunning=true =>
+            // Thread A: set timoutCheckIsRunning = false
+            if (outstandingRequests.firstEntry() != null
+                    && timoutCheckIsRunning.compareAndSet(false, true)) {
+                 continueCheckOutstandingRequestIfTimeout();
+            }
+            return;
+        }
+        OpRequestSend op = entry.getValue();
+        final long finalRequestId = op.requestId;
+        if (!op.isTimeout(operationTimeoutInMills)) {
+            timer.newTimeout(task -> continueCheckOutstandingRequestIfTimeout(),
+                    outstandingRequests.firstEntry().getValue()
+                            .getTimeoutMs(operationTimeoutInMills), TimeUnit.MILLISECONDS);
+            return;
+        }
+        if (!op.tryOwnership(finalRequestId)) {
+            // Just safe check to ensure endless loop cause StackOverFlow,
+            // because other operation will remove this request firstly.
+            outstandingRequests.remove(finalRequestId);
+            // This operation already handle by other thread, we need to check next.
+            continueCheckOutstandingRequestIfTimeout();
+            return;
+        }
+        outstandingRequests.remove(entry.getKey());
+        op.cb.completeExceptionally(new TransactionBufferClientException

Review Comment:
   Hi, @congbobo184 
   We use `op.tryOwnership(finalRequestId)` to avoid recycle race conditions.
   This method will use the `requestId` to avoid trying to take ownership of the wrong `OpRequestSend`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on pull request #15021: [improve][transaction] Use a global timeout checker to avoid many timeout tasks.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on PR #15021:
URL: https://github.com/apache/pulsar/pull/15021#issuecomment-1095875959

   I try to use the atomic status to ensure thread-safe. @codelipenghui  @Technoboy-  PTAL.
   
   PS: My mistake made me un-assign this PR.  please help re-assign me. thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #15021: [improve][transaction] Use a global timeout checker to avoid many timeout tasks.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #15021:
URL: https://github.com/apache/pulsar/pull/15021#discussion_r843584594


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java:
##########
@@ -287,6 +288,25 @@ private void checkPendingRequests() {
         }
     }
 
+    private void loopCheckOutstandingRequestIfTimeout() {
+        while (true) {
+            Map.Entry<Long, OpRequestSend> entry = outstandingRequests.firstEntry();
+            if (entry == null) {
+                timer.newTimeout(task -> loopCheckOutstandingRequestIfTimeout(), 100, TimeUnit.MILLISECONDS);
+                return;
+            }
+            OpRequestSend op = entry.getValue();
+            if (op.cb.isDone() || (System.currentTimeMillis() - op.createdAt) < operationTimeoutInMills) {
+                timer.newTimeout(task -> loopCheckOutstandingRequestIfTimeout(), 100, TimeUnit.MILLISECONDS);
+                return;
+            }
+            outstandingRequests.remove(entry.getKey());
+            op.cb.completeExceptionally(new TransactionBufferClientException
+                    .RequestTimeoutException());
+            onResponse(op);

Review Comment:
   Here will introduce a race condition, the op might be removed by another thread and been recycled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #15021: [improve][transaction] Use a global timeout checker to avoid many timeout tasks.

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #15021:
URL: https://github.com/apache/pulsar/pull/15021#discussion_r850036911


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java:
##########
@@ -74,6 +75,7 @@ public TransactionBufferHandlerImpl(PulsarService pulsarService, HashedWheelTime
         this.operationTimeoutInMills = operationTimeoutInMills;
         this.timer = timer;
         this.requestCredits = Math.max(100, maxConcurrentRequests);
+        timer.newTimeout(task -> loopCheckOutstandingRequestIfTimeout(), 100, TimeUnit.MILLISECONDS);

Review Comment:
   when have request comed, we can start a timeout is right



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java:
##########
@@ -134,16 +136,15 @@ public void endTxn(OpRequestSend op) {
                 if (clientCnx.ctx().channel().isActive()) {
                     clientCnx.registerTransactionBufferHandler(TransactionBufferHandlerImpl.this);
                     outstandingRequests.put(op.requestId, op);
-                    timer.newTimeout(timeout -> {
-                        OpRequestSend peek = outstandingRequests.remove(op.requestId);
-                        if (peek != null && !peek.cb.isDone() && !peek.cb.isCompletedExceptionally()) {
-                            peek.cb.completeExceptionally(new TransactionBufferClientException
-                                    .RequestTimeoutException());
-                            onResponse(peek);
-                        }
-                    }, operationTimeoutInMills, TimeUnit.MILLISECONDS);
                     op.cmd.retain();
-                    clientCnx.ctx().writeAndFlush(op.cmd, clientCnx.ctx().voidPromise());
+                    clientCnx.ctx().writeAndFlush(op.cmd)
+                            .addListener(result -> {
+                                if (!result.isSuccess()) {

Review Comment:
   should check tryOwnership



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java:
##########
@@ -222,6 +223,27 @@ public void handleEndTxnOnSubscriptionResponse(long requestId,
         }
     }
 
+    private void loopCheckOutstandingRequestIfTimeout() {
+        while (true) {
+            Map.Entry<Long, OpRequestSend> entry = outstandingRequests.firstEntry();
+            if (entry == null) {
+                timer.newTimeout(task -> loopCheckOutstandingRequestIfTimeout(), 100, TimeUnit.MILLISECONDS);
+                return;
+            }
+            OpRequestSend op = entry.getValue();
+            if (op.cb.isDone() || (System.currentTimeMillis() - op.createdAt) < operationTimeoutInMills
+                    // when request satisfy timeout condition, we should try to acquire ownership
+                    || !op.tryOwnership()) {
+                timer.newTimeout(task -> loopCheckOutstandingRequestIfTimeout(), 100, TimeUnit.MILLISECONDS);

Review Comment:
   100 -> System.currentTimeMillis() - op.createdAt, this is correct time.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java:
##########
@@ -222,6 +223,27 @@ public void handleEndTxnOnSubscriptionResponse(long requestId,
         }
     }
 
+    private void loopCheckOutstandingRequestIfTimeout() {
+        while (true) {
+            Map.Entry<Long, OpRequestSend> entry = outstandingRequests.firstEntry();
+            if (entry == null) {
+                timer.newTimeout(task -> loopCheckOutstandingRequestIfTimeout(), 100, TimeUnit.MILLISECONDS);
+                return;
+            }
+            OpRequestSend op = entry.getValue();
+            if (op.cb.isDone() || (System.currentTimeMillis() - op.createdAt) < operationTimeoutInMills
+                    // when request satisfy timeout condition, we should try to acquire ownership
+                    || !op.tryOwnership()) {

Review Comment:
   how about remove first then tryOwnership?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java:
##########
@@ -222,6 +223,27 @@ public void handleEndTxnOnSubscriptionResponse(long requestId,
         }
     }
 
+    private void loopCheckOutstandingRequestIfTimeout() {
+        while (true) {
+            Map.Entry<Long, OpRequestSend> entry = outstandingRequests.firstEntry();
+            if (entry == null) {
+                timer.newTimeout(task -> loopCheckOutstandingRequestIfTimeout(), 100, TimeUnit.MILLISECONDS);
+                return;
+            }
+            OpRequestSend op = entry.getValue();
+            if (op.cb.isDone() || (System.currentTimeMillis() - op.createdAt) < operationTimeoutInMills
+                    // when request satisfy timeout condition, we should try to acquire ownership
+                    || !op.tryOwnership()) {
+                timer.newTimeout(task -> loopCheckOutstandingRequestIfTimeout(), 100, TimeUnit.MILLISECONDS);

Review Comment:
   If op.tryOwnership() = false many times, may cause the timeout time not correct



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on pull request #15021: [improve][transaction] Use a global timeout checker to avoid many timeout tasks.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on PR #15021:
URL: https://github.com/apache/pulsar/pull/15021#issuecomment-1097975071

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #15021: [improve][transaction] Use a global timeout checker to avoid many timeout tasks.

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #15021:
URL: https://github.com/apache/pulsar/pull/15021#discussion_r851852410


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java:
##########
@@ -222,6 +233,42 @@ public void handleEndTxnOnSubscriptionResponse(long requestId,
         }
     }
 
+    private void continueCheckOutstandingRequestIfTimeout() {
+        Map.Entry<Long, OpRequestSend> entry = outstandingRequests.firstEntry();
+        if (entry == null) {
+            timoutCheckIsRunning.set(false);
+            // To avoid race conditions, the scenarios are as follows.
+            // Thread A: after if(entry=null) => Thread B: put a new entry and then get timoutCheckIsRunning=true =>
+            // Thread A: set timoutCheckIsRunning = false
+            if (outstandingRequests.firstEntry() != null
+                    && timoutCheckIsRunning.compareAndSet(false, true)) {
+                 continueCheckOutstandingRequestIfTimeout();
+            }
+            return;
+        }
+        OpRequestSend op = entry.getValue();
+        final long finalRequestId = op.requestId;
+        if (!op.isTimeout(operationTimeoutInMills)) {
+            timer.newTimeout(task -> continueCheckOutstandingRequestIfTimeout(),
+                    outstandingRequests.firstEntry().getValue()
+                            .getTimeoutMs(operationTimeoutInMills), TimeUnit.MILLISECONDS);
+            return;
+        }
+        if (!op.tryOwnership(finalRequestId)) {
+            // Just safe check to ensure endless loop cause StackOverFlow,
+            // because other operation will remove this request firstly.
+            outstandingRequests.remove(finalRequestId);
+            // This operation already handle by other thread, we need to check next.
+            continueCheckOutstandingRequestIfTimeout();
+            return;
+        }
+        outstandingRequests.remove(entry.getKey());
+        op.cb.completeExceptionally(new TransactionBufferClientException

Review Comment:
   Thread A=> firstEntry() 
   Thread B=> remove() and recycle this OpRequestSend
   Thread timeout => op.cb is not Thread A OpRequestSend, but Thread B OpRequestSend may completeExceptionally



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #15021: [improve][transaction] Use a global timeout checker to avoid many timeout tasks.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #15021:
URL: https://github.com/apache/pulsar/pull/15021#discussion_r843398234


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java:
##########
@@ -18,18 +18,22 @@
  */
 package org.apache.pulsar.broker.transaction.buffer;
 
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
+import com.github.dockerjava.zerodep.shaded.org.apache.hc.core5.concurrent.DefaultThreadFactory;

Review Comment:
   oh. I will fix it. thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org