You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/19 09:51:14 UTC

[pulsar] 12/26: [improve][transaction] support configurable ``transactionBufferClientOperationTimeoutInMills`` (#15011)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2c765d0d76e181ff5ce470295f96a16cfb307ad9
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Wed Apr 6 15:33:08 2022 +0800

    [improve][transaction] support configurable ``transactionBufferClientOperationTimeoutInMills`` (#15011)
    
    (cherry picked from commit 1c6ea125d629beb40bd5f381002f22619edf43df)
---
 .../main/java/org/apache/pulsar/broker/ServiceConfiguration.java    | 6 ++++++
 .../src/main/java/org/apache/pulsar/broker/PulsarService.java       | 3 ++-
 .../broker/transaction/buffer/impl/TransactionBufferClientImpl.java | 4 ++--
 .../transaction/buffer/impl/TransactionBufferHandlerImpl.java       | 6 +++---
 .../broker/transaction/buffer/TransactionBufferClientTest.java      | 6 +++---
 .../broker/transaction/buffer/TransactionBufferHandlerImplTest.java | 6 ++++--
 6 files changed, 20 insertions(+), 11 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 4b3b2e17831..fa8464376fc 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2222,6 +2222,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private int transactionBufferClientMaxConcurrentRequests = 1000;
 
+    @FieldContext(
+            category = CATEGORY_TRANSACTION,
+            doc = "The transaction buffer client's operation timeout in milliseconds."
+    )
+    private long transactionBufferClientOperationTimeoutInMills = 3000L;
+
     /**** --- KeyStore TLS config variables --- ****/
     @FieldContext(
             category = CATEGORY_KEYSTORE_TLS,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index ac4759d7948..6451e518b88 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -733,7 +733,8 @@ public class PulsarService implements AutoCloseable, ShutdownService {
                 this.transactionTimer =
                         new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer"));
                 transactionBufferClient = TransactionBufferClientImpl.create(getClient(), transactionTimer,
-                        config.getTransactionBufferClientMaxConcurrentRequests());
+                        config.getTransactionBufferClientMaxConcurrentRequests(),
+                        config.getTransactionBufferClientOperationTimeoutInMills());
 
                 transactionMetadataStoreService = new TransactionMetadataStoreService(TransactionMetadataStoreProvider
                         .newProvider(config.getTransactionMetadataStoreProviderClassName()), this,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
index 454e9a6d53b..060476e573c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
@@ -40,9 +40,9 @@ public class TransactionBufferClientImpl implements TransactionBufferClient {
     }
 
     public static TransactionBufferClient create(PulsarClient pulsarClient, HashedWheelTimer timer,
-             int maxConcurrentRequests) {
+             int maxConcurrentRequests, long operationTimeoutInMills) {
         TransactionBufferHandler handler = new TransactionBufferHandlerImpl(pulsarClient, timer,
-                maxConcurrentRequests);
+                maxConcurrentRequests, operationTimeoutInMills);
         return new TransactionBufferClientImpl(handler);
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
index 6ea53a3edd2..b80a273bc6f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
@@ -75,12 +75,12 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler {
                 }
             });
 
-    public TransactionBufferHandlerImpl(PulsarClient pulsarClient,
-                                        HashedWheelTimer timer, int maxConcurrentRequests) {
+    public TransactionBufferHandlerImpl(PulsarClient pulsarClient, HashedWheelTimer timer,
+                                        int maxConcurrentRequests, long operationTimeoutInMills) {
         this.pulsarClient = pulsarClient;
         this.outstandingRequests = new ConcurrentSkipListMap<>();
         this.pendingRequests = new GrowableArrayBlockingQueue<>();
-        this.operationTimeoutInMills = 3000L;
+        this.operationTimeoutInMills = operationTimeoutInMills;
         this.timer = timer;
         this.requestCredits = Math.max(100, maxConcurrentRequests);
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
index fa1b9e7f287..d85f6b42f23 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -81,7 +81,7 @@ public class TransactionBufferClientTest extends TransactionTestBase {
         admin.namespaces().createNamespace(namespace, 10);
         admin.topics().createPartitionedTopic(partitionedTopicName.getPartitionedTopicName(), partitions);
         tbClient = TransactionBufferClientImpl.create(pulsarClient,
-                new HashedWheelTimer(new DefaultThreadFactory("transaction-buffer")), 1000);
+                new HashedWheelTimer(new DefaultThreadFactory("transaction-buffer")), 1000, 3000);
     }
 
     @Override
@@ -160,7 +160,7 @@ public class TransactionBufferClientTest extends TransactionTestBase {
         @Cleanup("stop")
         HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
         TransactionBufferHandlerImpl transactionBufferHandler =
-                new TransactionBufferHandlerImpl(mockClient, hashedWheelTimer, 1000);
+                new TransactionBufferHandlerImpl(mockClient, hashedWheelTimer, 1000, 3000);
         CompletableFuture<TxnID> endFuture =
                 transactionBufferHandler.endTxnOnTopic("test", 1, 1, TxnAction.ABORT, 1);
 
@@ -203,7 +203,7 @@ public class TransactionBufferClientTest extends TransactionTestBase {
         @Cleanup("stop")
         HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
         TransactionBufferHandlerImpl transactionBufferHandler =
-                new TransactionBufferHandlerImpl(mockClient, hashedWheelTimer, 1000);
+                new TransactionBufferHandlerImpl(mockClient, hashedWheelTimer, 1000, 3000);
         try {
             transactionBufferHandler.endTxnOnTopic("test", 1, 1, TxnAction.ABORT, 1).get();
             fail();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
index ef0cf037772..5241342635b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
@@ -42,7 +42,8 @@ public class TransactionBufferHandlerImplTest {
     public void testRequestCredits() {
         PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
         when(pulsarClient.getConnection(anyString())).thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
-        TransactionBufferHandlerImpl handler = spy(new TransactionBufferHandlerImpl(pulsarClient, null, 1000));
+        TransactionBufferHandlerImpl handler = spy(
+                new TransactionBufferHandlerImpl(pulsarClient, null, 1000, 3000));
         doNothing().when(handler).endTxn(any());
         for (int i = 0; i < 500; i++) {
             handler.endTxnOnTopic("t", 1L, 1L, TxnAction.COMMIT, 1L);
@@ -61,7 +62,8 @@ public class TransactionBufferHandlerImplTest {
 
     @Test
     public void testMinRequestCredits() {
-        TransactionBufferHandlerImpl handler = spy(new TransactionBufferHandlerImpl(null, null, 50));
+        TransactionBufferHandlerImpl handler = spy(
+                new TransactionBufferHandlerImpl(null, null, 50, 3000));
         assertEquals(handler.getAvailableRequestCredits(), 100);
     }
 }