You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/04/28 10:00:24 UTC

[pulsar] branch master updated: [Transaction] Fix transaction buffer handler don't release semaphore problem. (#10412)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fef5a4f  [Transaction] Fix transaction buffer handler don't release semaphore problem. (#10412)
fef5a4f is described below

commit fef5a4f0d04148e50f4a046d0de358cf875a83c3
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Wed Apr 28 17:59:05 2021 +0800

    [Transaction] Fix transaction buffer handler don't release semaphore problem. (#10412)
    
    Co-authored-by: congbo <co...@github.com>
---
 .../buffer/impl/TransactionBufferHandlerImpl.java     |  4 ++--
 .../buffer/TransactionBufferClientTest.java           | 19 +++++++++++++++++++
 2 files changed, 21 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
index a7f7f7c..8617688 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
@@ -168,7 +168,7 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler, T
             cache.invalidate(op.topic);
             op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(), response.getMessage()));
         }
-        op.recycle();
+        onResponse(op);
     }
 
     @Override
@@ -195,7 +195,7 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler, T
             cache.invalidate(op.topic);
             op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(), response.getMessage()));
         }
-        op.recycle();
+        onResponse(op);
     }
 
     private boolean canSendRequest(CompletableFuture<?> callback) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
index c035b6a..069dadb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.pulsar.broker.intercept.MockBrokerInterceptor;
@@ -262,4 +263,22 @@ public class TransactionBufferClientTest extends TransactionMetaStoreTestBase {
         tbClient.abortTxnOnTopic(topic + "_abort_topic", 1L, 1L, -1L).get();
         tbClient.commitTxnOnTopic(topic + "_commit_topic", 1L, 1L, -1L).get();
     }
+
+    @Test
+    public void testTransactionBufferHandlerSemaphore() throws Exception {
+
+        Field field = TransactionBufferClientImpl.class.getDeclaredField("tbHandler");
+        field.setAccessible(true);
+        TransactionBufferHandlerImpl transactionBufferHandler = (TransactionBufferHandlerImpl) field.get(tbClient);
+
+        field = TransactionBufferHandlerImpl.class.getDeclaredField("semaphore");
+        field.setAccessible(true);
+        field.set(transactionBufferHandler, new Semaphore(2));
+
+        String topic = "persistent://" + namespace + "/testTransactionBufferLookUp";
+        tbClient.abortTxnOnSubscription(topic + "_abort_sub", "test", 1L, 1L, -1L).get();
+        tbClient.abortTxnOnTopic(topic + "_abort_topic", 1L, 1L, -1L).get();
+        tbClient.commitTxnOnSubscription(topic + "_commit_sub", "test", 1L, 1L, -1L).get();
+        tbClient.commitTxnOnTopic(topic + "_commit_topic", 1L, 1L, -1L).get();
+    }
 }