You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/21 03:02:45 UTC
[pulsar] 14/15: [Transaction]Txn client check timeout (#12521)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b244d0c3bc625bb4d4d3108735fd2a219d328a50
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Sat Dec 18 08:35:18 2021 +0800
[Transaction]Txn client check timeout (#12521)
### Motivation
Optimize the logic on the Transaction Client side.
Avoid sending and acking messages with timeout transactions.
### Modifications
* TransactionImp
* Add a tool field for CAS to replace State : STATE_UPDATE.
**When committing and aborted, only the successful cas operation will make subsequent judgments, otherwise it will return a failure future**
* Implement TimerTasker to execute tasks that replace the state of the transaction as Aborted.
* TransactionBuildImpl
* In the callback of the build method, call the timer of PulsarClient to start a Timeout. Pass in the corresponding transactionImpl (TimeTasker has been implemented)
(cherry picked from commit c5d7a84c8e5c27e48022df8c7082496840cd3be9)
---
.../client/impl/ConsumerAckResponseTest.java | 2 +
.../client/impl/TransactionEndToEndTest.java | 67 +++++++++++++++++++++-
.../TransactionCoordinatorClientException.java | 20 +++++++
.../apache/pulsar/client/impl/ConsumerBase.java | 4 ++
.../client/impl/PartitionedProducerImpl.java | 5 ++
.../apache/pulsar/client/impl/ProducerImpl.java | 4 ++
.../pulsar/client/impl/PulsarClientImpl.java | 1 +
.../impl/transaction/TransactionBuilderImpl.java | 9 ++-
.../client/impl/transaction/TransactionImpl.java | 38 +++++++++---
9 files changed, 137 insertions(+), 13 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java
index 0378c53..6981865 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java
@@ -50,8 +50,10 @@ public class ConsumerAckResponseTest extends ProducerConsumerBase {
super.producerBaseSetup();
doReturn(1L).when(transaction).getTxnIdLeastBits();
doReturn(1L).when(transaction).getTxnIdMostBits();
+ doReturn(TransactionImpl.State.OPEN).when(transaction).getState();
CompletableFuture<Void> completableFuture = CompletableFuture.completedFuture(null);
doNothing().when(transaction).registerAckOp(any());
+ doReturn(true).when(transaction).checkIfOpen(any());
doReturn(completableFuture).when(transaction).registerAckedTopic(any(), any());
Thread.sleep(1000 * 3);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 4630449..f52d319 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -23,7 +23,9 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
+import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.ArrayList;
@@ -770,19 +772,46 @@ public class TransactionEndToEndTest extends TransactionTestBase {
}
});
+ Class<TransactionImpl> transactionClass = TransactionImpl.class;
+ Constructor<TransactionImpl> constructor = transactionClass
+ .getDeclaredConstructor(PulsarClientImpl.class, long.class, long.class, long.class);
+ constructor.setAccessible(true);
+
+ TransactionImpl timeoutTxnSkipClientTimeout = constructor.newInstance(pulsarClient, 5,
+ timeoutTxn.getTxnID().getLeastSigBits(), timeoutTxn.getTxnID().getMostSigBits());
+
try {
- timeoutTxn.commit().get();
+ timeoutTxnSkipClientTimeout.commit().get();
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof TransactionNotFoundException);
}
Field field = TransactionImpl.class.getDeclaredField("state");
field.setAccessible(true);
- TransactionImpl.State state = (TransactionImpl.State) field.get(timeoutTxn);
+ TransactionImpl.State state = (TransactionImpl.State) field.get(timeoutTxnSkipClientTimeout);
assertEquals(state, TransactionImpl.State.ERROR);
}
@Test
+ public void testTxnTimeoutAtTransactionMetadataStore() throws Exception{
+ TxnID txnID = pulsarServiceList.get(0).getTransactionMetadataStoreService()
+ .newTransaction(new TransactionCoordinatorID(0), 1).get();
+ Awaitility.await().until(() -> {
+ try {
+ getPulsarServiceList().get(0).getTransactionMetadataStoreService().getTxnMeta(txnID).get();
+ return false;
+ } catch (Exception e) {
+ return true;
+ }
+ });
+ Collection<TransactionMetadataStore> transactionMetadataStores =
+ getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores().values();
+ long timeoutCount = transactionMetadataStores.stream()
+ .mapToLong(store -> store.getMetadataStoreStats().timeoutCount).sum();
+ Assert.assertEquals(timeoutCount, 1);
+ }
+
+ @Test
public void transactionTimeoutTest() throws Exception {
String topic = NAMESPACE1 + "/txn-timeout";
@@ -943,4 +972,38 @@ public class TransactionEndToEndTest extends TransactionTestBase {
}
assertTrue(flag);
}
+
+ @Test
+ public void testTxnTimeOutInClient() throws Exception{
+ String topic = NAMESPACE1 + "/testTxnTimeOutInClient";
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING).producerName("testTxnTimeOut_producer")
+ .topic(topic).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).consumerName("testTxnTimeOut_consumer")
+ .topic(topic).subscriptionName("testTxnTimeOut_sub").subscribe();
+
+ Transaction transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.SECONDS)
+ .build().get();
+ producer.newMessage().send();
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertEquals(((TransactionImpl)transaction).getState(), TransactionImpl.State.TIMEOUT);
+ });
+
+ try {
+ producer.newMessage(transaction).send();
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertTrue(e.getCause().getCause() instanceof TransactionCoordinatorClientException
+ .InvalidTxnStatusException);
+ }
+ try {
+ Message<String> message = consumer.receive();
+ consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertTrue(e.getCause() instanceof TransactionCoordinatorClientException
+ .InvalidTxnStatusException);
+ }
+ }
}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java
index 0e1f6c7..d7df4e3 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java
@@ -68,6 +68,11 @@ public class TransactionCoordinatorClientException extends IOException {
public InvalidTxnStatusException(String message) {
super(message);
}
+
+ public InvalidTxnStatusException(String txnId, String actualState, String expectState) {
+ super("["+ txnId +"] with unexpected state : "
+ + actualState + ", expect " + expectState + " state!");
+ }
}
/**
@@ -93,6 +98,21 @@ public class TransactionCoordinatorClientException extends IOException {
}
}
+
+ /**
+ * Thrown when transaction meta was timeout.
+ */
+ public static class TransactionTimeotException extends TransactionCoordinatorClientException {
+
+ public TransactionTimeotException(Throwable t) {
+ super(t);
+ }
+
+ public TransactionTimeotException(String transactionId) {
+ super("The transaction " + transactionId + " is timeout.");
+ }
+ }
+
/**
* Thrown when send request to transaction meta store but the transaction meta store handler not ready.
*/
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 1251593..0f47208 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -487,6 +487,10 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
if (null != txn) {
checkArgument(txn instanceof TransactionImpl);
txnImpl = (TransactionImpl) txn;
+ CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+ if (!txnImpl.checkIfOpen(completableFuture)) {
+ return completableFuture;
+ }
}
return doAcknowledgeWithTxn(messageId, AckType.Individual, Collections.emptyMap(), txnImpl);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 3311050..216d775 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -191,6 +192,10 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
@Override
CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transaction txn) {
+ CompletableFuture<MessageId> completableFuture = new CompletableFuture<>();
+ if (txn != null && !((TransactionImpl)txn).checkIfOpen(completableFuture)) {
+ return completableFuture;
+ }
int partition = routerPolicy.choosePartition(message, topicMetadata);
checkArgument(partition >= 0 && partition < topicMetadata.numPartitions(),
"Illegal partition index chosen by the message routing policy: " + partition);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index a3ca386..5944c8f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -371,6 +371,10 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
if (txn == null) {
return internalSendAsync(message);
} else {
+ CompletableFuture<MessageId> completableFuture = new CompletableFuture<>();
+ if (!((TransactionImpl)txn).checkIfOpen(completableFuture)) {
+ return completableFuture;
+ }
return ((TransactionImpl) txn).registerProducedTopic(topic)
.thenCompose(ignored -> internalSendAsync(message));
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 7703afc..c5195c9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -95,6 +95,7 @@ public class PulsarClientImpl implements PulsarClient {
protected final ClientConfigurationData conf;
private LookupService lookup;
private final ConnectionPool cnxPool;
+ @Getter
private final Timer timer;
private boolean needStopTimer;
private final ExecutorProvider externalExecutorProvider;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java
index 84be46f..3ac8676 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.client.impl.transaction;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
@@ -67,8 +69,11 @@ public class TransactionBuilderImpl implements TransactionBuilder {
future.completeExceptionally(throwable);
return;
}
- future.complete(new TransactionImpl(client, txnTimeout,
- txnID.getLeastSigBits(), txnID.getMostSigBits()));
+ TransactionImpl transaction = new TransactionImpl(client, txnTimeout,
+ txnID.getLeastSigBits(), txnID.getMostSigBits());
+ client.getTimer().newTimeout(transaction,
+ txnTimeout, timeUnit);
+ future.complete(transaction);
});
return future;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index 458976a..4128a6f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.client.impl.transaction;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -25,6 +27,7 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import com.google.common.collect.Lists;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
@@ -48,7 +51,7 @@ import org.apache.pulsar.common.util.FutureUtil;
*/
@Slf4j
@Getter
-public class TransactionImpl implements Transaction {
+public class TransactionImpl implements Transaction , TimerTask {
private final PulsarClientImpl client;
private final long transactionTimeoutMs;
@@ -63,6 +66,13 @@ public class TransactionImpl implements Transaction {
private final ArrayList<CompletableFuture<MessageId>> sendFutureList;
private final ArrayList<CompletableFuture<Void>> ackFutureList;
private volatile State state;
+ private final AtomicReferenceFieldUpdater<TransactionImpl, State> STATE_UPDATE =
+ AtomicReferenceFieldUpdater.newUpdater(TransactionImpl.class, State.class, "state");
+
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ STATE_UPDATE.compareAndSet(this, State.OPEN, State.TIMEOUT);
+ }
public enum State {
OPEN,
@@ -70,7 +80,8 @@ public class TransactionImpl implements Transaction {
ABORTING,
COMMITTED,
ABORTED,
- ERROR
+ ERROR,
+ TIMEOUT
}
TransactionImpl(PulsarClientImpl client,
@@ -93,7 +104,8 @@ public class TransactionImpl implements Transaction {
// register the topics that will be modified by this transaction
public CompletableFuture<Void> registerProducedTopic(String topic) {
- return checkIfOpen().thenCompose(value -> {
+ CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+ if (checkIfOpen(completableFuture)) {
synchronized (TransactionImpl.this) {
// we need to issue the request to TC to register the produced topic
return registerPartitionMap.compute(topic, (key, future) -> {
@@ -106,7 +118,9 @@ public class TransactionImpl implements Transaction {
}
});
}
- });
+ } else {
+ return completableFuture;
+ }
}
public synchronized void registerSendOp(CompletableFuture<MessageId> sendFuture) {
@@ -115,7 +129,8 @@ public class TransactionImpl implements Transaction {
// register the topics that will be modified by this transaction
public CompletableFuture<Void> registerAckedTopic(String topic, String subscription) {
- return checkIfOpen().thenCompose(value -> {
+ CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+ if (checkIfOpen(completableFuture)) {
synchronized (TransactionImpl.this) {
// we need to issue the request to TC to register the acked topic
return registerSubscriptionMap.compute(Pair.of(topic, subscription), (key, future) -> {
@@ -128,7 +143,9 @@ public class TransactionImpl implements Transaction {
}
});
}
- });
+ } else {
+ return completableFuture;
+ }
}
public synchronized void registerAckOp(CompletableFuture<Void> ackFuture) {
@@ -213,11 +230,14 @@ public class TransactionImpl implements Transaction {
return new TxnID(txnIdMostBits, txnIdLeastBits);
}
- private CompletableFuture<Void> checkIfOpen() {
+ public <T> boolean checkIfOpen(CompletableFuture<T> completableFuture) {
if (state == State.OPEN) {
- return CompletableFuture.completedFuture(null);
+ return true;
} else {
- return invalidTxnStatusFuture();
+ completableFuture
+ .completeExceptionally(new InvalidTxnStatusException(
+ new TxnID(txnIdMostBits, txnIdLeastBits).toString(), state.name(), State.OPEN.name()));
+ return false;
}
}