You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/03/30 13:30:50 UTC

[GitHub] [pulsar] codelipenghui commented on a change in pull request #14894: [fix][transaction] avoid too many ServiceUnitNotReadyException for transaction buffer handler

codelipenghui commented on a change in pull request #14894:
URL: https://github.com/apache/pulsar/pull/14894#discussion_r838543270



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
##########
@@ -87,7 +96,18 @@ public TransactionBufferHandlerImpl(PulsarClient pulsarClient,
         long requestId = requestIdGenerator.getAndIncrement();
         ByteBuf cmd = Commands.newEndTxnOnPartition(requestId, txnIdLeastBits, txnIdMostBits,
                 topic, action, lowWaterMark);
-        return endTxn(requestId, topic, cmd, cb);
+
+        try {

Review comment:
       lookupCache.get(topic) will throw ExecutionException which means some errors happened in the lookup cache, we need to catch the exception and invalidate the cache.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
##########
@@ -102,79 +122,101 @@ public TransactionBufferHandlerImpl(PulsarClient pulsarClient,
         long requestId = requestIdGenerator.getAndIncrement();
         ByteBuf cmd = Commands.newEndTxnOnSubscription(requestId, txnIdLeastBits, txnIdMostBits,
                 topic, subscription, action, lowWaterMark);
-        return endTxn(requestId, topic, cmd, cb);
-    }
-
-    private CompletableFuture<TxnID> endTxn(long requestId, String topic, ByteBuf cmd, CompletableFuture<TxnID> cb) {
-        OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb);
         try {
-            cache.get(topic).whenComplete((clientCnx, throwable) -> {
-                if (throwable == null) {
-                    if (clientCnx.ctx().channel().isActive()) {
-                        clientCnx.registerTransactionBufferHandler(TransactionBufferHandlerImpl.this);
-                        pendingRequests.put(requestId, op);
-                        timer.newTimeout(timeout -> {
-                            OpRequestSend peek = pendingRequests.remove(requestId);
-                            if (peek != null && !peek.cb.isDone() && !peek.cb.isCompletedExceptionally()) {
-                                peek.cb.completeExceptionally(new TransactionBufferClientException
-                                        .RequestTimeoutException());
-                                onResponse(peek);
-                            }
-                        }, operationTimeoutInMills, TimeUnit.MILLISECONDS);
-                        cmd.retain();
-                        clientCnx.ctx().writeAndFlush(cmd, clientCnx.ctx().voidPromise());
-                    } else {
-                        cache.invalidate(topic);
-                        cb.completeExceptionally(
-                                new PulsarClientException.LookupException(topic + " endTxn channel is not active"));
-                        op.recycle();
-                    }
-                } else {
-                    log.error("endTxn error topic: [{}]", topic, throwable);
-                    cache.invalidate(topic);
-                    cb.completeExceptionally(
-                            new PulsarClientException.LookupException(throwable.getMessage()));
-                    op.recycle();
-                }
-            });
+            OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb, lookupCache.get(topic));
+            if (checkRequestCredits(op)) {
+                endTxn(op);
+            }
         } catch (ExecutionException e) {
-            log.error("endTxn channel is not active exception", e);
-            cache.invalidate(topic);
+            log.error("[{}] failed to get client cnx from lookup cache", topic, e);
+            lookupCache.invalidate(topic);
             cb.completeExceptionally(new PulsarClientException.LookupException(e.getCause().getMessage()));
-            op.recycle();
         }
         return cb;
     }
 
+    private boolean checkRequestCredits(OpRequestSend op) {
+        int currentPermits = REQUEST_CREDITS_UPDATER.get(this);
+        if (currentPermits > 0 && pendingRequests.peek() == null) {
+            if (REQUEST_CREDITS_UPDATER.compareAndSet(this, currentPermits, currentPermits - 1)) {
+                return true;
+            } else {
+                return checkRequestCredits(op);
+            }
+        } else {
+            pendingRequests.add(op);
+            return false;
+        }
+    }
+
+    public void endTxn(OpRequestSend op) {
+        op.cnx.whenComplete((clientCnx, throwable) -> {
+            if (throwable == null) {
+                if (clientCnx.ctx().channel().isActive()) {
+                    clientCnx.registerTransactionBufferHandler(TransactionBufferHandlerImpl.this);
+                    outstandingRequests.put(op.requestId, op);
+                    timer.newTimeout(timeout -> {
+                        OpRequestSend peek = outstandingRequests.remove(op.requestId);
+                        if (peek != null && !peek.cb.isDone() && !peek.cb.isCompletedExceptionally()) {
+                            peek.cb.completeExceptionally(new TransactionBufferClientException
+                                    .RequestTimeoutException());
+                            onResponse(peek);
+                        }
+                    }, operationTimeoutInMills, TimeUnit.MILLISECONDS);
+                    op.cmd.retain();
+                    clientCnx.ctx().writeAndFlush(op.cmd, clientCnx.ctx().voidPromise());
+                } else {
+                    invalidateLookupCache(op);
+                    op.cb.completeExceptionally(
+                            new PulsarClientException.LookupException(op.topic + " endTxn channel is not active"));
+                    onResponse(op);
+                }
+            } else {
+                log.error("endTxn error topic: [{}]", op.topic, throwable);
+                invalidateLookupCache(op);
+                op.cb.completeExceptionally(
+                        new PulsarClientException.LookupException(throwable.getMessage()));
+                onResponse(op);
+            }
+        });
+    }
+
     @Override
     public void handleEndTxnOnTopicResponse(long requestId, CommandEndTxnOnPartitionResponse response) {
-        OpRequestSend op = pendingRequests.remove(requestId);
+        OpRequestSend op = outstandingRequests.remove(requestId);
         if (op == null) {
             if (log.isDebugEnabled()) {
                 log.debug("Got end txn on topic response for timeout {} - {}", response.getTxnidMostBits(),
                         response.getTxnidLeastBits());
             }
             return;
         }
-
-        if (!response.hasError()) {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Got end txn on topic response for for request {}", op.topic, response.getRequestId());
+        try {
+            if (!response.hasError()) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Got end txn on topic response for for request {}", op.topic,
+                            response.getRequestId());
+                }
+                op.cb.complete(new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits()));
+            } else {
+                log.error("[{}] Got end txn on topic response for request {} error {}", op.topic,
+                        response.getRequestId(),
+                        response.getError());
+                invalidateLookupCache(op);
+                op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(),
+                        response.getMessage()));
             }
-            op.cb.complete(new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits()));
-        } else {
-            log.error("[{}] Got end txn on topic response for request {} error {}", op.topic, response.getRequestId(),
-                    response.getError());
-            cache.invalidate(op.topic);
-            op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(), response.getMessage()));
+        } catch (Exception e) {

Review comment:
       Just want to add an error log if there are some exceptions when completing the callback, the final block ensure the onResponse() can be complete.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
##########
@@ -183,41 +225,98 @@ public void handleEndTxnOnSubscriptionResponse(long requestId,
             return;
         }
 
-        if (!response.hasError()) {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Got end txn on subscription response for for request {}",
-                        op.topic, response.getRequestId());
+        try {
+            if (!response.hasError()) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Got end txn on subscription response for for request {}",
+                            op.topic, response.getRequestId());
+                }
+                op.cb.complete(new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits()));
+            } else {
+                log.error("[{}] Got end txn on subscription response for request {} error {}",
+                        op.topic, response.getRequestId(), response.getError());
+                invalidateLookupCache(op);
+                op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(),
+                        response.getMessage()));
             }
-            op.cb.complete(new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits()));
-        } else {
-            log.error("[{}] Got end txn on subscription response for request {} error {}",
-                    op.topic, response.getRequestId(), response.getError());
-            cache.invalidate(op.topic);
-            op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(), response.getMessage()));
+        } catch (Exception e) {
+            log.error("[{}] Got exception when complete EndTxnOnSub op for request {}", op.topic, e);
+        } finally {
+            onResponse(op);
         }
-        onResponse(op);
     }
 
-    void onResponse(OpRequestSend op) {
-        ReferenceCountUtil.safeRelease(op.byteBuf);
-        op.recycle();
+    public void onResponse(OpRequestSend op) {
+        REQUEST_CREDITS_UPDATER.incrementAndGet(this);
+        if (op != null) {
+            ReferenceCountUtil.safeRelease(op.cmd);
+            op.recycle();
+        }
+        checkPendingRequests();
     }
 
-    private static final class OpRequestSend {
+    private void checkPendingRequests() {
+        while (true) {
+            int permits = REQUEST_CREDITS_UPDATER.get(this);
+            if (permits > 0 && pendingRequests.peek() != null) {
+                if (REQUEST_CREDITS_UPDATER.compareAndSet(this, permits, permits - 1)) {
+                    OpRequestSend polled = pendingRequests.poll();
+                    if (polled != null) {
+                        try {
+                            if (polled.cnx != lookupCache.get(polled.topic)) {
+                                OpRequestSend invalid = polled;
+                                polled = OpRequestSend.create(invalid.requestId, invalid.topic, invalid.cmd, invalid.cb,
+                                        lookupCache.get(invalid.topic));
+                                invalid.recycle();
+                            }
+                            endTxn(polled);
+                        } catch (ExecutionException e) {
+                            log.error("[{}] failed to get client cnx from lookup cache", polled.topic, e);
+                            lookupCache.invalidate(polled.topic);
+                            polled.cb.completeExceptionally(new PulsarClientException.LookupException(
+                                    e.getCause().getMessage()));
+                            REQUEST_CREDITS_UPDATER.incrementAndGet(this);
+                        }
+                    } else {
+                        REQUEST_CREDITS_UPDATER.incrementAndGet(this);
+                    }
+                } else {
+                    checkPendingRequests();

Review comment:
       Nice catch!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org