You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/05/07 12:24:26 UTC
[pulsar] branch master updated: [Fix][txn] Make transaction stats consistent at end txn (#15472)
This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5e6580abc8a [Fix][txn] Make transaction stats consistent at end txn (#15472)
5e6580abc8a is described below
commit 5e6580abc8aea515581f0d23964b46bb58e493f4
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Sat May 7 20:24:15 2022 +0800
[Fix][txn] Make transaction stats consistent at end txn (#15472)
### Motivation
When the end transaction log is appended to the transaction log, the transaction ended. The transaction should be removed from the `txnMetaMap`. If transactionLog fails to delete the location, we only need to log it.
### Modification
Not complete exceptionally, but only give a warn log.
---
.../pulsar/broker/transaction/TransactionTest.java | 24 ++++++++++++++++++
.../impl/MLTransactionMetadataStore.java | 29 ++++++++++------------
2 files changed, 37 insertions(+), 16 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 60ecf04e602..8f2dacc434f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -102,6 +102,7 @@ import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
@@ -971,4 +972,27 @@ public class TransactionTest extends TransactionTestBase {
Integer.parseInt(lastConfirmedEntry[1]) - 2);
});
}
+
+ @Test
+ public void testConsistencyOfTransactionStatsAtEndTxn() throws Exception {
+ TransactionMetadataStore transactionMetadataStore = getPulsarServiceList().get(0)
+ .getTransactionMetadataStoreService()
+ .getStores()
+ .get(new TransactionCoordinatorID(0));
+
+ Field field = MLTransactionMetadataStore.class.getDeclaredField("transactionLog");
+ field.setAccessible(true);
+ MLTransactionLogImpl transactionLog = (MLTransactionLogImpl) field.get(transactionMetadataStore);
+ Field field1 = MLTransactionLogImpl.class.getDeclaredField("cursor");
+ field1.setAccessible(true);
+ ManagedCursorImpl managedCursor = (ManagedCursorImpl) field1.get(transactionLog);
+ managedCursor.close();
+
+ Transaction transaction = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build()
+ .get();
+
+ transaction.commit().get();
+ }
}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index f93de8b0175..685d57e664e 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -372,23 +372,20 @@ public class MLTransactionMetadataStore
this.transactionTimeoutCount.increment();
}
if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
- transactionLog.deletePosition(txnMetaListPair.getRight()).whenComplete((v, ex) -> {
- if (ex != null) {
- promise.completeExceptionally(ex);
- return;
- }
- this.transactionMetadataStoreStats
- .addTransactionExecutionLatencySample(System.currentTimeMillis()
- - txnMetaListPair.getLeft().getOpenTimestamp());
- if (newStatus == TxnStatus.COMMITTED) {
- committedTransactionCount.increment();
- } else {
- abortedTransactionCount.increment();
- }
- txnMetaMap.remove(txnID.getLeastSigBits());
- promise.complete(null);
+ this.transactionMetadataStoreStats
+ .addTransactionExecutionLatencySample(System.currentTimeMillis()
+ - txnMetaListPair.getLeft().getOpenTimestamp());
+ if (newStatus == TxnStatus.COMMITTED) {
+ committedTransactionCount.increment();
+ } else {
+ abortedTransactionCount.increment();
+ }
+ txnMetaMap.remove(txnID.getLeastSigBits());
+ transactionLog.deletePosition(txnMetaListPair.getRight()).exceptionally(ex -> {
+ log.warn("Failed to delete transaction log position "
+ + "at end transaction [{}]", txnID);
+ return null;
});
- return;
}
promise.complete(null);
} catch (InvalidTxnStatusException e) {