You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/02/10 04:04:33 UTC
[pulsar] 10/13: [Transaction] Optimize transaction timeout (#14172)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit de5dec4239fbd1c3a595e013a81f262059444489
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Wed Feb 9 15:34:25 2022 +0800
[Transaction] Optimize transaction timeout (#14172)
(cherry picked from commit 5ee210a5a12573bf8d047962bbac82528091216c)
---
.../pulsar/broker/transaction/TransactionTest.java | 27 +++++++++++++++++++++-
.../impl/transaction/TransactionBuilderImpl.java | 4 +---
.../client/impl/transaction/TransactionImpl.java | 6 +++++
3 files changed, 33 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 4aaadb2..b627438 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -34,6 +34,7 @@ import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import io.netty.buffer.Unpooled;
+import io.netty.util.Timeout;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.HashMap;
@@ -774,4 +775,28 @@ public class TransactionTest extends TransactionTestBase {
}));
completableFuture.get(5, TimeUnit.SECONDS);
}
-}
+
+ @Test
+ public void testCancelTxnTimeout() throws Exception{
+ Transaction transaction = pulsarClient.newTransaction()
+ .withTransactionTimeout(10, TimeUnit.SECONDS)
+ .build()
+ .get();
+
+ transaction.commit().get();
+
+ Field field = TransactionImpl.class.getDeclaredField("timeout");
+ field.setAccessible(true);
+ Timeout timeout = (Timeout) field.get(transaction);
+ Assert.assertTrue(timeout.isCancelled());
+
+ transaction = pulsarClient.newTransaction()
+ .withTransactionTimeout(10, TimeUnit.SECONDS)
+ .build()
+ .get();
+
+ transaction.abort().get();
+ timeout = (Timeout) field.get(transaction);
+ Assert.assertTrue(timeout.isCancelled());
+ }
+}
\ No newline at end of file
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java
index 3ac8676..9878264 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java
@@ -69,10 +69,8 @@ public class TransactionBuilderImpl implements TransactionBuilder {
future.completeExceptionally(throwable);
return;
}
- TransactionImpl transaction = new TransactionImpl(client, txnTimeout,
+ TransactionImpl transaction = new TransactionImpl(client, timeUnit.toMillis(txnTimeout),
txnID.getLeastSigBits(), txnID.getMostSigBits());
- client.getTimer().newTimeout(transaction,
- txnTimeout, timeUnit);
future.complete(transaction);
});
return future;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index ebcb20e..8adc162 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import com.google.common.collect.Lists;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -68,6 +69,7 @@ public class TransactionImpl implements Transaction , TimerTask {
private volatile State state;
private static final AtomicReferenceFieldUpdater<TransactionImpl, State> STATE_UPDATE =
AtomicReferenceFieldUpdater.newUpdater(TransactionImpl.class, State.class, "state");
+ private final Timeout timeout;
@Override
public void run(Timeout timeout) throws Exception {
@@ -100,6 +102,8 @@ public class TransactionImpl implements Transaction , TimerTask {
this.sendFutureList = new ArrayList<>();
this.ackFutureList = new ArrayList<>();
+ this.timeout = client.getTimer().newTimeout(this, transactionTimeoutMs, TimeUnit.MILLISECONDS);
+
}
// register the topics that will be modified by this transaction
@@ -164,6 +168,7 @@ public class TransactionImpl implements Transaction , TimerTask {
return checkIfOpenOrCommitting().thenCompose((value) -> {
CompletableFuture<Void> commitFuture = new CompletableFuture<>();
this.state = State.COMMITTING;
+ timeout.cancel();
allOpComplete().whenComplete((v, e) -> {
if (e != null) {
abort().whenComplete((vx, ex) -> commitFuture.completeExceptionally(e));
@@ -192,6 +197,7 @@ public class TransactionImpl implements Transaction , TimerTask {
return checkIfOpenOrAborting().thenCompose(value -> {
CompletableFuture<Void> abortFuture = new CompletableFuture<>();
this.state = State.ABORTING;
+ timeout.cancel();
allOpComplete().whenComplete((v, e) -> {
if (e != null) {
log.error(e.getMessage());