You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/04/28 10:00:24 UTC
[pulsar] branch master updated: [Transaction] Fix transaction
buffer handler don't release semaphore problem. (#10412)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fef5a4f [Transaction] Fix transaction buffer handler don't release semaphore problem. (#10412)
fef5a4f is described below
commit fef5a4f0d04148e50f4a046d0de358cf875a83c3
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Wed Apr 28 17:59:05 2021 +0800
[Transaction] Fix transaction buffer handler don't release semaphore problem. (#10412)
Co-authored-by: congbo <co...@github.com>
---
.../buffer/impl/TransactionBufferHandlerImpl.java | 4 ++--
.../buffer/TransactionBufferClientTest.java | 19 +++++++++++++++++++
2 files changed, 21 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
index a7f7f7c..8617688 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
@@ -168,7 +168,7 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler, T
cache.invalidate(op.topic);
op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(), response.getMessage()));
}
- op.recycle();
+ onResponse(op);
}
@Override
@@ -195,7 +195,7 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler, T
cache.invalidate(op.topic);
op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(), response.getMessage()));
}
- op.recycle();
+ onResponse(op);
}
private boolean canSendRequest(CompletableFuture<?> callback) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
index c035b6a..069dadb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -32,6 +32,7 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.pulsar.broker.intercept.MockBrokerInterceptor;
@@ -262,4 +263,22 @@ public class TransactionBufferClientTest extends TransactionMetaStoreTestBase {
tbClient.abortTxnOnTopic(topic + "_abort_topic", 1L, 1L, -1L).get();
tbClient.commitTxnOnTopic(topic + "_commit_topic", 1L, 1L, -1L).get();
}
+
+ @Test
+ public void testTransactionBufferHandlerSemaphore() throws Exception {
+
+ Field field = TransactionBufferClientImpl.class.getDeclaredField("tbHandler");
+ field.setAccessible(true);
+ TransactionBufferHandlerImpl transactionBufferHandler = (TransactionBufferHandlerImpl) field.get(tbClient);
+
+ field = TransactionBufferHandlerImpl.class.getDeclaredField("semaphore");
+ field.setAccessible(true);
+ field.set(transactionBufferHandler, new Semaphore(2));
+
+ String topic = "persistent://" + namespace + "/testTransactionBufferLookUp";
+ tbClient.abortTxnOnSubscription(topic + "_abort_sub", "test", 1L, 1L, -1L).get();
+ tbClient.abortTxnOnTopic(topic + "_abort_topic", 1L, 1L, -1L).get();
+ tbClient.commitTxnOnSubscription(topic + "_commit_sub", "test", 1L, 1L, -1L).get();
+ tbClient.commitTxnOnTopic(topic + "_commit_topic", 1L, 1L, -1L).get();
+ }
}