You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/04/14 02:59:41 UTC

[GitHub] [pulsar] congbobo184 commented on a diff in pull request #15021: [improve][transaction] Use a global timeout checker to avoid many timeout tasks.

congbobo184 commented on code in PR #15021:
URL: https://github.com/apache/pulsar/pull/15021#discussion_r850036911


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java:
##########
@@ -74,6 +75,7 @@ public TransactionBufferHandlerImpl(PulsarService pulsarService, HashedWheelTime
         this.operationTimeoutInMills = operationTimeoutInMills;
         this.timer = timer;
         this.requestCredits = Math.max(100, maxConcurrentRequests);
+        timer.newTimeout(task -> loopCheckOutstandingRequestIfTimeout(), 100, TimeUnit.MILLISECONDS);

Review Comment:
   when have request comed, we can start a timeout is right



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java:
##########
@@ -134,16 +136,15 @@ public void endTxn(OpRequestSend op) {
                 if (clientCnx.ctx().channel().isActive()) {
                     clientCnx.registerTransactionBufferHandler(TransactionBufferHandlerImpl.this);
                     outstandingRequests.put(op.requestId, op);
-                    timer.newTimeout(timeout -> {
-                        OpRequestSend peek = outstandingRequests.remove(op.requestId);
-                        if (peek != null && !peek.cb.isDone() && !peek.cb.isCompletedExceptionally()) {
-                            peek.cb.completeExceptionally(new TransactionBufferClientException
-                                    .RequestTimeoutException());
-                            onResponse(peek);
-                        }
-                    }, operationTimeoutInMills, TimeUnit.MILLISECONDS);
                     op.cmd.retain();
-                    clientCnx.ctx().writeAndFlush(op.cmd, clientCnx.ctx().voidPromise());
+                    clientCnx.ctx().writeAndFlush(op.cmd)
+                            .addListener(result -> {
+                                if (!result.isSuccess()) {

Review Comment:
   should check tryOwnership



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java:
##########
@@ -222,6 +223,27 @@ public void handleEndTxnOnSubscriptionResponse(long requestId,
         }
     }
 
+    private void loopCheckOutstandingRequestIfTimeout() {
+        while (true) {
+            Map.Entry<Long, OpRequestSend> entry = outstandingRequests.firstEntry();
+            if (entry == null) {
+                timer.newTimeout(task -> loopCheckOutstandingRequestIfTimeout(), 100, TimeUnit.MILLISECONDS);
+                return;
+            }
+            OpRequestSend op = entry.getValue();
+            if (op.cb.isDone() || (System.currentTimeMillis() - op.createdAt) < operationTimeoutInMills
+                    // when request satisfy timeout condition, we should try to acquire ownership
+                    || !op.tryOwnership()) {
+                timer.newTimeout(task -> loopCheckOutstandingRequestIfTimeout(), 100, TimeUnit.MILLISECONDS);

Review Comment:
   100 -> System.currentTimeMillis() - op.createdAt, this is correct time.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java:
##########
@@ -222,6 +223,27 @@ public void handleEndTxnOnSubscriptionResponse(long requestId,
         }
     }
 
+    private void loopCheckOutstandingRequestIfTimeout() {
+        while (true) {
+            Map.Entry<Long, OpRequestSend> entry = outstandingRequests.firstEntry();
+            if (entry == null) {
+                timer.newTimeout(task -> loopCheckOutstandingRequestIfTimeout(), 100, TimeUnit.MILLISECONDS);
+                return;
+            }
+            OpRequestSend op = entry.getValue();
+            if (op.cb.isDone() || (System.currentTimeMillis() - op.createdAt) < operationTimeoutInMills
+                    // when request satisfy timeout condition, we should try to acquire ownership
+                    || !op.tryOwnership()) {

Review Comment:
   how about remove first then tryOwnership?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java:
##########
@@ -222,6 +223,27 @@ public void handleEndTxnOnSubscriptionResponse(long requestId,
         }
     }
 
+    private void loopCheckOutstandingRequestIfTimeout() {
+        while (true) {
+            Map.Entry<Long, OpRequestSend> entry = outstandingRequests.firstEntry();
+            if (entry == null) {
+                timer.newTimeout(task -> loopCheckOutstandingRequestIfTimeout(), 100, TimeUnit.MILLISECONDS);
+                return;
+            }
+            OpRequestSend op = entry.getValue();
+            if (op.cb.isDone() || (System.currentTimeMillis() - op.createdAt) < operationTimeoutInMills
+                    // when request satisfy timeout condition, we should try to acquire ownership
+                    || !op.tryOwnership()) {
+                timer.newTimeout(task -> loopCheckOutstandingRequestIfTimeout(), 100, TimeUnit.MILLISECONDS);

Review Comment:
   If op.tryOwnership() = false many times, may cause the timeout time not correct



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org