You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/05/07 12:24:26 UTC

[pulsar] branch master updated: [Fix][txn] Make transaction stats consistent at end txn (#15472)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e6580abc8a [Fix][txn] Make transaction stats consistent at end txn (#15472)
5e6580abc8a is described below

commit 5e6580abc8aea515581f0d23964b46bb58e493f4
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Sat May 7 20:24:15 2022 +0800

    [Fix][txn] Make transaction stats consistent at end txn (#15472)
    
    ### Motivation
    When the end transaction log is appended to the transaction log, the transaction ended. The transaction should be removed from the `txnMetaMap`. If transactionLog fails to delete the location, we only need to log it.
    ### Modification
    Not complete exceptionally, but only give a warn log.
---
 .../pulsar/broker/transaction/TransactionTest.java | 24 ++++++++++++++++++
 .../impl/MLTransactionMetadataStore.java           | 29 ++++++++++------------
 2 files changed, 37 insertions(+), 16 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 60ecf04e602..8f2dacc434f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -102,6 +102,7 @@ import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
 import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
 import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
@@ -971,4 +972,27 @@ public class TransactionTest extends TransactionTestBase {
                     Integer.parseInt(lastConfirmedEntry[1]) - 2);
         });
     }
+
+    @Test
+    public void testConsistencyOfTransactionStatsAtEndTxn() throws Exception {
+        TransactionMetadataStore transactionMetadataStore = getPulsarServiceList().get(0)
+                .getTransactionMetadataStoreService()
+                .getStores()
+                .get(new TransactionCoordinatorID(0));
+
+        Field field = MLTransactionMetadataStore.class.getDeclaredField("transactionLog");
+        field.setAccessible(true);
+        MLTransactionLogImpl transactionLog = (MLTransactionLogImpl) field.get(transactionMetadataStore);
+        Field field1 = MLTransactionLogImpl.class.getDeclaredField("cursor");
+        field1.setAccessible(true);
+        ManagedCursorImpl managedCursor = (ManagedCursorImpl) field1.get(transactionLog);
+        managedCursor.close();
+
+        Transaction transaction = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build()
+                .get();
+
+        transaction.commit().get();
+    }
 }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index f93de8b0175..685d57e664e 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -372,23 +372,20 @@ public class MLTransactionMetadataStore
                                     this.transactionTimeoutCount.increment();
                                 }
                                 if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
-                                    transactionLog.deletePosition(txnMetaListPair.getRight()).whenComplete((v, ex) -> {
-                                        if (ex != null) {
-                                            promise.completeExceptionally(ex);
-                                            return;
-                                        }
-                                        this.transactionMetadataStoreStats
-                                                .addTransactionExecutionLatencySample(System.currentTimeMillis()
-                                                        - txnMetaListPair.getLeft().getOpenTimestamp());
-                                        if (newStatus == TxnStatus.COMMITTED) {
-                                            committedTransactionCount.increment();
-                                        } else {
-                                            abortedTransactionCount.increment();
-                                        }
-                                        txnMetaMap.remove(txnID.getLeastSigBits());
-                                        promise.complete(null);
+                                    this.transactionMetadataStoreStats
+                                            .addTransactionExecutionLatencySample(System.currentTimeMillis()
+                                                    - txnMetaListPair.getLeft().getOpenTimestamp());
+                                    if (newStatus == TxnStatus.COMMITTED) {
+                                        committedTransactionCount.increment();
+                                    } else {
+                                        abortedTransactionCount.increment();
+                                    }
+                                    txnMetaMap.remove(txnID.getLeastSigBits());
+                                    transactionLog.deletePosition(txnMetaListPair.getRight()).exceptionally(ex -> {
+                                        log.warn("Failed to delete transaction log position "
+                                                + "at end transaction [{}]", txnID);
+                                        return null;
                                     });
-                                    return;
                                 }
                                 promise.complete(null);
                             } catch (InvalidTxnStatusException e) {