You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/03/30 12:54:51 UTC

[GitHub] [pulsar] congbobo184 commented on a change in pull request #14894: [fix][transaction] avoid too many ServiceUnitNotReadyException for transaction buffer handler

congbobo184 commented on a change in pull request #14894:
URL: https://github.com/apache/pulsar/pull/14894#discussion_r838472624



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
##########
@@ -87,7 +96,18 @@ public TransactionBufferHandlerImpl(PulsarClient pulsarClient,
         long requestId = requestIdGenerator.getAndIncrement();
         ByteBuf cmd = Commands.newEndTxnOnPartition(requestId, txnIdLeastBits, txnIdMostBits,
                 topic, action, lowWaterMark);
-        return endTxn(requestId, topic, cmd, cb);
+
+        try {

Review comment:
       this try catch can be deleted, below all

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
##########
@@ -102,79 +122,101 @@ public TransactionBufferHandlerImpl(PulsarClient pulsarClient,
         long requestId = requestIdGenerator.getAndIncrement();
         ByteBuf cmd = Commands.newEndTxnOnSubscription(requestId, txnIdLeastBits, txnIdMostBits,
                 topic, subscription, action, lowWaterMark);
-        return endTxn(requestId, topic, cmd, cb);
-    }
-
-    private CompletableFuture<TxnID> endTxn(long requestId, String topic, ByteBuf cmd, CompletableFuture<TxnID> cb) {
-        OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb);
         try {
-            cache.get(topic).whenComplete((clientCnx, throwable) -> {
-                if (throwable == null) {
-                    if (clientCnx.ctx().channel().isActive()) {
-                        clientCnx.registerTransactionBufferHandler(TransactionBufferHandlerImpl.this);
-                        pendingRequests.put(requestId, op);
-                        timer.newTimeout(timeout -> {
-                            OpRequestSend peek = pendingRequests.remove(requestId);
-                            if (peek != null && !peek.cb.isDone() && !peek.cb.isCompletedExceptionally()) {
-                                peek.cb.completeExceptionally(new TransactionBufferClientException
-                                        .RequestTimeoutException());
-                                onResponse(peek);
-                            }
-                        }, operationTimeoutInMills, TimeUnit.MILLISECONDS);
-                        cmd.retain();
-                        clientCnx.ctx().writeAndFlush(cmd, clientCnx.ctx().voidPromise());
-                    } else {
-                        cache.invalidate(topic);
-                        cb.completeExceptionally(
-                                new PulsarClientException.LookupException(topic + " endTxn channel is not active"));
-                        op.recycle();
-                    }
-                } else {
-                    log.error("endTxn error topic: [{}]", topic, throwable);
-                    cache.invalidate(topic);
-                    cb.completeExceptionally(
-                            new PulsarClientException.LookupException(throwable.getMessage()));
-                    op.recycle();
-                }
-            });
+            OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb, lookupCache.get(topic));
+            if (checkRequestCredits(op)) {
+                endTxn(op);
+            }
         } catch (ExecutionException e) {
-            log.error("endTxn channel is not active exception", e);
-            cache.invalidate(topic);
+            log.error("[{}] failed to get client cnx from lookup cache", topic, e);
+            lookupCache.invalidate(topic);
             cb.completeExceptionally(new PulsarClientException.LookupException(e.getCause().getMessage()));
-            op.recycle();
         }
         return cb;
     }
 
+    private boolean checkRequestCredits(OpRequestSend op) {
+        int currentPermits = REQUEST_CREDITS_UPDATER.get(this);
+        if (currentPermits > 0 && pendingRequests.peek() == null) {
+            if (REQUEST_CREDITS_UPDATER.compareAndSet(this, currentPermits, currentPermits - 1)) {
+                return true;
+            } else {
+                return checkRequestCredits(op);
+            }
+        } else {
+            pendingRequests.add(op);
+            return false;
+        }
+    }
+
+    public void endTxn(OpRequestSend op) {
+        op.cnx.whenComplete((clientCnx, throwable) -> {
+            if (throwable == null) {
+                if (clientCnx.ctx().channel().isActive()) {
+                    clientCnx.registerTransactionBufferHandler(TransactionBufferHandlerImpl.this);
+                    outstandingRequests.put(op.requestId, op);
+                    timer.newTimeout(timeout -> {
+                        OpRequestSend peek = outstandingRequests.remove(op.requestId);
+                        if (peek != null && !peek.cb.isDone() && !peek.cb.isCompletedExceptionally()) {
+                            peek.cb.completeExceptionally(new TransactionBufferClientException
+                                    .RequestTimeoutException());
+                            onResponse(peek);
+                        }
+                    }, operationTimeoutInMills, TimeUnit.MILLISECONDS);
+                    op.cmd.retain();
+                    clientCnx.ctx().writeAndFlush(op.cmd, clientCnx.ctx().voidPromise());
+                } else {
+                    invalidateLookupCache(op);
+                    op.cb.completeExceptionally(
+                            new PulsarClientException.LookupException(op.topic + " endTxn channel is not active"));
+                    onResponse(op);
+                }
+            } else {
+                log.error("endTxn error topic: [{}]", op.topic, throwable);
+                invalidateLookupCache(op);
+                op.cb.completeExceptionally(
+                        new PulsarClientException.LookupException(throwable.getMessage()));
+                onResponse(op);
+            }
+        });
+    }
+
     @Override
     public void handleEndTxnOnTopicResponse(long requestId, CommandEndTxnOnPartitionResponse response) {
-        OpRequestSend op = pendingRequests.remove(requestId);
+        OpRequestSend op = outstandingRequests.remove(requestId);
         if (op == null) {
             if (log.isDebugEnabled()) {
                 log.debug("Got end txn on topic response for timeout {} - {}", response.getTxnidMostBits(),
                         response.getTxnidLeastBits());
             }
             return;
         }
-
-        if (!response.hasError()) {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Got end txn on topic response for for request {}", op.topic, response.getRequestId());
+        try {
+            if (!response.hasError()) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Got end txn on topic response for for request {}", op.topic,
+                            response.getRequestId());
+                }
+                op.cb.complete(new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits()));
+            } else {
+                log.error("[{}] Got end txn on topic response for request {} error {}", op.topic,
+                        response.getRequestId(),
+                        response.getError());
+                invalidateLookupCache(op);
+                op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(),
+                        response.getMessage()));
             }
-            op.cb.complete(new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits()));
-        } else {
-            log.error("[{}] Got end txn on topic response for request {} error {}", op.topic, response.getRequestId(),
-                    response.getError());
-            cache.invalidate(op.topic);
-            op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(), response.getMessage()));
+        } catch (Exception e) {

Review comment:
       does we need this try catch?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
##########
@@ -183,41 +225,98 @@ public void handleEndTxnOnSubscriptionResponse(long requestId,
             return;
         }
 
-        if (!response.hasError()) {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Got end txn on subscription response for for request {}",
-                        op.topic, response.getRequestId());
+        try {
+            if (!response.hasError()) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Got end txn on subscription response for for request {}",
+                            op.topic, response.getRequestId());
+                }
+                op.cb.complete(new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits()));
+            } else {
+                log.error("[{}] Got end txn on subscription response for request {} error {}",
+                        op.topic, response.getRequestId(), response.getError());
+                invalidateLookupCache(op);
+                op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(),
+                        response.getMessage()));
             }
-            op.cb.complete(new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits()));
-        } else {
-            log.error("[{}] Got end txn on subscription response for request {} error {}",
-                    op.topic, response.getRequestId(), response.getError());
-            cache.invalidate(op.topic);
-            op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(), response.getMessage()));
+        } catch (Exception e) {
+            log.error("[{}] Got exception when complete EndTxnOnSub op for request {}", op.topic, e);
+        } finally {
+            onResponse(op);
         }
-        onResponse(op);
     }
 
-    void onResponse(OpRequestSend op) {
-        ReferenceCountUtil.safeRelease(op.byteBuf);
-        op.recycle();
+    public void onResponse(OpRequestSend op) {
+        REQUEST_CREDITS_UPDATER.incrementAndGet(this);
+        if (op != null) {
+            ReferenceCountUtil.safeRelease(op.cmd);
+            op.recycle();
+        }
+        checkPendingRequests();
     }
 
-    private static final class OpRequestSend {
+    private void checkPendingRequests() {
+        while (true) {
+            int permits = REQUEST_CREDITS_UPDATER.get(this);
+            if (permits > 0 && pendingRequests.peek() != null) {
+                if (REQUEST_CREDITS_UPDATER.compareAndSet(this, permits, permits - 1)) {
+                    OpRequestSend polled = pendingRequests.poll();
+                    if (polled != null) {
+                        try {
+                            if (polled.cnx != lookupCache.get(polled.topic)) {
+                                OpRequestSend invalid = polled;
+                                polled = OpRequestSend.create(invalid.requestId, invalid.topic, invalid.cmd, invalid.cb,
+                                        lookupCache.get(invalid.topic));
+                                invalid.recycle();
+                            }
+                            endTxn(polled);
+                        } catch (ExecutionException e) {
+                            log.error("[{}] failed to get client cnx from lookup cache", polled.topic, e);
+                            lookupCache.invalidate(polled.topic);
+                            polled.cb.completeExceptionally(new PulsarClientException.LookupException(
+                                    e.getCause().getMessage()));
+                            REQUEST_CREDITS_UPDATER.incrementAndGet(this);
+                        }
+                    } else {
+                        REQUEST_CREDITS_UPDATER.incrementAndGet(this);
+                    }
+                } else {
+                    checkPendingRequests();

Review comment:
       only `continue;` is enough




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org