You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2019/06/14 14:34:43 UTC

[activemq] 04/04: AMQ-7225 - fix intermittent failure, avoid gc of partial tx pending commit

This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git

commit 6ff79d85aae0068ede78927b7ea13d2783d9c767
Author: gtully <ga...@gmail.com>
AuthorDate: Tue Jun 11 12:36:02 2019 +0100

    AMQ-7225 - fix intermittent failure, avoid gc of partial tx pending commit
    
    (cherry picked from commit 28a0cc6e5a78adb4b0b0134c860911c921f6a074)
---
 .../activemq/store/kahadb/MultiKahaDBTransactionStore.java | 14 +++++++++-----
 .../org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java    |  2 +-
 2 files changed, 10 insertions(+), 6 deletions(-)

diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
index 5befa92..d1b2d8e 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
@@ -61,7 +61,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
     static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBTransactionStore.class);
     final MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter;
     final ConcurrentMap<TransactionId, Tx> inflightTransactions = new ConcurrentHashMap<TransactionId, Tx>();
-    final Set<TransactionId> recoveredPendingCommit = new HashSet<TransactionId>();
+    final ConcurrentMap<TransactionId, Tx> pendingCommit = new ConcurrentHashMap<TransactionId, Tx>();
     private Journal journal;
     private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
     private int journalWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
@@ -279,10 +279,12 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
 
     public void persistOutcome(Tx tx, TransactionId txid) throws IOException {
         tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid)))));
+        pendingCommit.put(txid, tx);
     }
 
     public void persistCompletion(TransactionId txid) throws IOException {
         store(new KahaCommitCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid))));
+        pendingCommit.remove(txid);
     }
 
     private Location store(JournalCommand<?> data) throws IOException {
@@ -335,6 +337,9 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
         for (Tx tx : inflightTransactions.values()) {
             knownDataFileIds.remove(tx.getPreparedLocationId());
         }
+        for (Tx tx : pendingCommit.values()) {
+            knownDataFileIds.remove(tx.getPreparedLocationId());
+        }
         try {
             journal.removeDataFiles(knownDataFileIds);
         } catch (Exception e) {
@@ -360,8 +365,8 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
             process(location, load(location));
             location = journal.getNextLocation(location);
         }
-        recoveredPendingCommit.addAll(inflightTransactions.keySet());
-        LOG.info("pending local transactions: " + recoveredPendingCommit);
+        pendingCommit.putAll(inflightTransactions);
+        LOG.info("pending local transactions: " + pendingCommit.keySet());
     }
 
     public JournalCommand<?> load(Location location) throws IOException {
@@ -417,10 +422,9 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
             for (TransactionId txid : broker.getPreparedTransactions(null)) {
                 if (multiKahaDBPersistenceAdapter.isLocalXid(txid)) {
                     try {
-                        if (recoveredPendingCommit.contains(txid)) {
+                        if (pendingCommit.keySet().contains(txid)) {
                             LOG.info("delivering pending commit outcome for tid: " + txid);
                             broker.commitTransaction(null, txid, false);
-                            recoveredPendingCommit.remove(txid);
                         } else {
                             LOG.info("delivering rollback outcome to store for tid: " + txid);
                             broker.forgetTransaction(null, txid);
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java
index 4a7e9c6..da96431 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java
@@ -207,7 +207,7 @@ public class MKahaDBTxRecoveryTest {
 
         multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
         multiKahaDBPersistenceAdapter.setJournalMaxFileLength(4*1024);
-        multiKahaDBPersistenceAdapter.setJournalCleanupInterval(CLEANUP_INTERVAL_MILLIS);
+        multiKahaDBPersistenceAdapter.setJournalCleanupInterval(10);
 
         broker = createBroker(multiKahaDBPersistenceAdapter);
     }