You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2019/06/14 14:34:43 UTC
[activemq] 04/04: AMQ-7225 - fix intermittent failure,
avoid gc of partial tx pending commit
This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
commit 6ff79d85aae0068ede78927b7ea13d2783d9c767
Author: gtully <ga...@gmail.com>
AuthorDate: Tue Jun 11 12:36:02 2019 +0100
AMQ-7225 - fix intermittent failure, avoid gc of partial tx pending commit
(cherry picked from commit 28a0cc6e5a78adb4b0b0134c860911c921f6a074)
---
.../activemq/store/kahadb/MultiKahaDBTransactionStore.java | 14 +++++++++-----
.../org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java | 2 +-
2 files changed, 10 insertions(+), 6 deletions(-)
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
index 5befa92..d1b2d8e 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
@@ -61,7 +61,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBTransactionStore.class);
final MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter;
final ConcurrentMap<TransactionId, Tx> inflightTransactions = new ConcurrentHashMap<TransactionId, Tx>();
- final Set<TransactionId> recoveredPendingCommit = new HashSet<TransactionId>();
+ final ConcurrentMap<TransactionId, Tx> pendingCommit = new ConcurrentHashMap<TransactionId, Tx>();
private Journal journal;
private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
private int journalWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
@@ -279,10 +279,12 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
public void persistOutcome(Tx tx, TransactionId txid) throws IOException {
tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid)))));
+ pendingCommit.put(txid, tx);
}
public void persistCompletion(TransactionId txid) throws IOException {
store(new KahaCommitCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid))));
+ pendingCommit.remove(txid);
}
private Location store(JournalCommand<?> data) throws IOException {
@@ -335,6 +337,9 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
for (Tx tx : inflightTransactions.values()) {
knownDataFileIds.remove(tx.getPreparedLocationId());
}
+ for (Tx tx : pendingCommit.values()) {
+ knownDataFileIds.remove(tx.getPreparedLocationId());
+ }
try {
journal.removeDataFiles(knownDataFileIds);
} catch (Exception e) {
@@ -360,8 +365,8 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
process(location, load(location));
location = journal.getNextLocation(location);
}
- recoveredPendingCommit.addAll(inflightTransactions.keySet());
- LOG.info("pending local transactions: " + recoveredPendingCommit);
+ pendingCommit.putAll(inflightTransactions);
+ LOG.info("pending local transactions: " + pendingCommit.keySet());
}
public JournalCommand<?> load(Location location) throws IOException {
@@ -417,10 +422,9 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
for (TransactionId txid : broker.getPreparedTransactions(null)) {
if (multiKahaDBPersistenceAdapter.isLocalXid(txid)) {
try {
- if (recoveredPendingCommit.contains(txid)) {
+ if (pendingCommit.keySet().contains(txid)) {
LOG.info("delivering pending commit outcome for tid: " + txid);
broker.commitTransaction(null, txid, false);
- recoveredPendingCommit.remove(txid);
} else {
LOG.info("delivering rollback outcome to store for tid: " + txid);
broker.forgetTransaction(null, txid);
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java
index 4a7e9c6..da96431 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java
@@ -207,7 +207,7 @@ public class MKahaDBTxRecoveryTest {
multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
multiKahaDBPersistenceAdapter.setJournalMaxFileLength(4*1024);
- multiKahaDBPersistenceAdapter.setJournalCleanupInterval(CLEANUP_INTERVAL_MILLIS);
+ multiKahaDBPersistenceAdapter.setJournalCleanupInterval(10);
broker = createBroker(multiKahaDBPersistenceAdapter);
}