You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/06 07:33:16 UTC
[pulsar] branch master updated: [improve][transaction] support configurable ``transactionBufferClientOperationTimeoutInMills`` (#15011)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1c6ea125d62 [improve][transaction] support configurable ``transactionBufferClientOperationTimeoutInMills`` (#15011)
1c6ea125d62 is described below
commit 1c6ea125d629beb40bd5f381002f22619edf43df
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Wed Apr 6 15:33:08 2022 +0800
[improve][transaction] support configurable ``transactionBufferClientOperationTimeoutInMills`` (#15011)
---
.../main/java/org/apache/pulsar/broker/ServiceConfiguration.java | 6 ++++++
.../src/main/java/org/apache/pulsar/broker/PulsarService.java | 3 ++-
.../broker/transaction/buffer/impl/TransactionBufferClientImpl.java | 4 ++--
.../transaction/buffer/impl/TransactionBufferHandlerImpl.java | 6 +++---
.../broker/transaction/buffer/TransactionBufferClientTest.java | 6 +++---
.../broker/transaction/buffer/TransactionBufferHandlerImplTest.java | 6 ++++--
6 files changed, 20 insertions(+), 11 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 86604f87def..355e252db19 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2489,6 +2489,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int transactionBufferClientMaxConcurrentRequests = 1000;
+ @FieldContext(
+ category = CATEGORY_TRANSACTION,
+ doc = "The transaction buffer client's operation timeout in milliseconds."
+ )
+ private long transactionBufferClientOperationTimeoutInMills = 3000L;
+
/**** --- KeyStore TLS config variables. --- ****/
@FieldContext(
category = CATEGORY_KEYSTORE_TLS,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 93fc75e0ff5..b060cbbcfcf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -752,7 +752,8 @@ public class PulsarService implements AutoCloseable, ShutdownService {
this.transactionTimer =
new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer"));
transactionBufferClient = TransactionBufferClientImpl.create(getClient(), transactionTimer,
- config.getTransactionBufferClientMaxConcurrentRequests());
+ config.getTransactionBufferClientMaxConcurrentRequests(),
+ config.getTransactionBufferClientOperationTimeoutInMills());
transactionMetadataStoreService = new TransactionMetadataStoreService(TransactionMetadataStoreProvider
.newProvider(config.getTransactionMetadataStoreProviderClassName()), this,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
index 454e9a6d53b..060476e573c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
@@ -40,9 +40,9 @@ public class TransactionBufferClientImpl implements TransactionBufferClient {
}
public static TransactionBufferClient create(PulsarClient pulsarClient, HashedWheelTimer timer,
- int maxConcurrentRequests) {
+ int maxConcurrentRequests, long operationTimeoutInMills) {
TransactionBufferHandler handler = new TransactionBufferHandlerImpl(pulsarClient, timer,
- maxConcurrentRequests);
+ maxConcurrentRequests, operationTimeoutInMills);
return new TransactionBufferClientImpl(handler);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
index 6ea53a3edd2..b80a273bc6f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
@@ -75,12 +75,12 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler {
}
});
- public TransactionBufferHandlerImpl(PulsarClient pulsarClient,
- HashedWheelTimer timer, int maxConcurrentRequests) {
+ public TransactionBufferHandlerImpl(PulsarClient pulsarClient, HashedWheelTimer timer,
+ int maxConcurrentRequests, long operationTimeoutInMills) {
this.pulsarClient = pulsarClient;
this.outstandingRequests = new ConcurrentSkipListMap<>();
this.pendingRequests = new GrowableArrayBlockingQueue<>();
- this.operationTimeoutInMills = 3000L;
+ this.operationTimeoutInMills = operationTimeoutInMills;
this.timer = timer;
this.requestCredits = Math.max(100, maxConcurrentRequests);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
index fa1b9e7f287..d85f6b42f23 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -81,7 +81,7 @@ public class TransactionBufferClientTest extends TransactionTestBase {
admin.namespaces().createNamespace(namespace, 10);
admin.topics().createPartitionedTopic(partitionedTopicName.getPartitionedTopicName(), partitions);
tbClient = TransactionBufferClientImpl.create(pulsarClient,
- new HashedWheelTimer(new DefaultThreadFactory("transaction-buffer")), 1000);
+ new HashedWheelTimer(new DefaultThreadFactory("transaction-buffer")), 1000, 3000);
}
@Override
@@ -160,7 +160,7 @@ public class TransactionBufferClientTest extends TransactionTestBase {
@Cleanup("stop")
HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
TransactionBufferHandlerImpl transactionBufferHandler =
- new TransactionBufferHandlerImpl(mockClient, hashedWheelTimer, 1000);
+ new TransactionBufferHandlerImpl(mockClient, hashedWheelTimer, 1000, 3000);
CompletableFuture<TxnID> endFuture =
transactionBufferHandler.endTxnOnTopic("test", 1, 1, TxnAction.ABORT, 1);
@@ -203,7 +203,7 @@ public class TransactionBufferClientTest extends TransactionTestBase {
@Cleanup("stop")
HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
TransactionBufferHandlerImpl transactionBufferHandler =
- new TransactionBufferHandlerImpl(mockClient, hashedWheelTimer, 1000);
+ new TransactionBufferHandlerImpl(mockClient, hashedWheelTimer, 1000, 3000);
try {
transactionBufferHandler.endTxnOnTopic("test", 1, 1, TxnAction.ABORT, 1).get();
fail();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
index ef0cf037772..5241342635b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
@@ -42,7 +42,8 @@ public class TransactionBufferHandlerImplTest {
public void testRequestCredits() {
PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
when(pulsarClient.getConnection(anyString())).thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
- TransactionBufferHandlerImpl handler = spy(new TransactionBufferHandlerImpl(pulsarClient, null, 1000));
+ TransactionBufferHandlerImpl handler = spy(
+ new TransactionBufferHandlerImpl(pulsarClient, null, 1000, 3000));
doNothing().when(handler).endTxn(any());
for (int i = 0; i < 500; i++) {
handler.endTxnOnTopic("t", 1L, 1L, TxnAction.COMMIT, 1L);
@@ -61,7 +62,8 @@ public class TransactionBufferHandlerImplTest {
@Test
public void testMinRequestCredits() {
- TransactionBufferHandlerImpl handler = spy(new TransactionBufferHandlerImpl(null, null, 50));
+ TransactionBufferHandlerImpl handler = spy(
+ new TransactionBufferHandlerImpl(null, null, 50, 3000));
assertEquals(handler.getAvailableRequestCredits(), 100);
}
}