You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2018/07/19 18:00:28 UTC
[1/2] activemq git commit: AMQ-7015 Added a
purgeRecoveredXATransactions property on the KahaDB adaptor to purge prepared
XA messages on recovery
Repository: activemq
Updated Branches:
refs/heads/activemq-5.15.x 7313d72c6 -> 63779e2f7
AMQ-7015 Added a purgeRecoveredXATransactions property on the KahaDB adaptor to purge prepared XA messages on recovery
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ce7498c9
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ce7498c9
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ce7498c9
Branch: refs/heads/activemq-5.15.x
Commit: ce7498c971b99e2515f07aab36418a1a0f19c03e
Parents: 7313d72
Author: hkesler <hk...@contractor.cengage.com>
Authored: Thu Jul 19 11:53:04 2018 -0600
Committer: hkesler <hk...@contractor.cengage.com>
Committed: Thu Jul 19 11:53:04 2018 -0600
----------------------------------------------------------------------
.../store/kahadb/KahaDBPersistenceAdapter.java | 8 +++
.../activemq/store/kahadb/MessageDatabase.java | 16 +++++
.../broker/BrokerRestartTestSupport.java | 7 ++-
.../activemq/broker/XARecoveryBrokerTest.java | 66 ++++++++++++++++++++
4 files changed, 95 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/ce7498c9/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
index c4f480c..fa0f7c2 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
@@ -547,6 +547,14 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
}
+ public boolean isPurgeRecoveredXATransactions() {
+ return letter.isPurgeRecoveredXATransactions();
+ }
+
+ public void setPurgeRecoveredXATransactions(boolean purgeRecoveredXATransactions) {
+ letter.setPurgeRecoveredXATransactions(purgeRecoveredXATransactions);
+ }
+
@Override
public void setBrokerService(BrokerService brokerService) {
super.setBrokerService(brokerService);
http://git-wip-us.apache.org/repos/asf/activemq/blob/ce7498c9/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 94de6ea..76d0da0 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -272,6 +272,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
private boolean ignoreMissingJournalfiles = false;
private int indexCacheSize = 10000;
private boolean checkForCorruptJournalFiles = false;
+ private boolean purgeRecoveredXATransactions = false;
private boolean checksumJournalFiles = true;
protected boolean forceRecoverIndex = false;
private boolean archiveCorruptedIndex = false;
@@ -748,6 +749,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
for (TransactionId txId : preparedTransactions.keySet()) {
LOG.warn("Recovered prepared XA TX: [{}]", txId);
}
+
+ if (purgeRecoveredXATransactions){
+ if (!preparedTransactions.isEmpty()){
+ LOG.warn("Purging " + preparedTransactions.size() + " recovered prepared XA TXs" );
+ preparedTransactions.clear();
+ }
+ }
}
} finally {
@@ -3340,6 +3348,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
}
+ public boolean isPurgeRecoveredXATransactions() {
+ return purgeRecoveredXATransactions;
+ }
+
+ public void setPurgeRecoveredXATransactions(boolean purgeRecoveredXATransactions) {
+ this.purgeRecoveredXATransactions = purgeRecoveredXATransactions;
+ }
+
public boolean isChecksumJournalFiles() {
return checksumJournalFiles;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/ce7498c9/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
index c4e3848..111494a 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
@@ -58,10 +58,13 @@ public class BrokerRestartTestSupport extends BrokerTestSupport {
* @throws URISyntaxException
*/
protected void restartBroker() throws Exception {
+ stopBroker();
+ broker.start();
+ }
+
+ protected void stopBroker() throws Exception {
broker.stop();
broker.waitUntilStopped();
broker = createRestartedBroker();
- broker.start();
}
-
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/ce7498c9/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
index 9660ef0..b52681a 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
@@ -267,6 +267,72 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
assertEmptyDLQ();
}
+ public void testPreparedTransactionRecoveredPurgeOnRestart() throws Exception {
+
+ ActiveMQDestination destination = createDestination();
+
+ // Setup the producer and send the message.
+ StubConnection connection = createConnection();
+ ConnectionInfo connectionInfo = createConnectionInfo();
+ SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ connection.send(producerInfo);
+ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+ connection.send(consumerInfo);
+
+ // Prepare 4 message sends.
+ for (int i = 0; i < 4; i++) {
+ // Begin the transaction.
+ XATransactionId txid = createXATransaction(sessionInfo);
+ connection.send(createBeginTransaction(connectionInfo, txid));
+
+ Message message = createMessage(producerInfo, destination);
+ message.setPersistent(true);
+ message.setTransactionId(txid);
+ connection.send(message);
+
+ // Prepare
+ connection.send(createPrepareTransaction(connectionInfo, txid));
+ }
+
+ // Since prepared but not committed.. they should not get delivered.
+ assertNull(receiveMessage(connection));
+ assertNoMessagesLeft(connection);
+ connection.request(closeConnectionInfo(connectionInfo));
+
+ // restart the broker.
+ stopBroker();
+ if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
+ KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
+ adapter.setPurgeRecoveredXATransactions(true);
+ LOG.info("Setting purgeRecoveredXATransactions to true on the KahaDBPersistenceAdapter");
+ }
+ broker.start();
+
+ // Setup the consumer and try receive the message.
+ connection = createConnection();
+ connectionInfo = createConnectionInfo();
+ sessionInfo = createSessionInfo(connectionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ consumerInfo = createConsumerInfo(sessionInfo, destination);
+ connection.send(consumerInfo);
+
+ // Since prepared but not committed.. they should not get delivered.
+ assertNull(receiveMessage(connection));
+ assertNoMessagesLeft(connection);
+
+ Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
+ assertNotNull(response);
+ DataArrayResponse dar = (DataArrayResponse)response;
+
+ //These should be purged so expect 0
+ assertEquals(0, dar.getData().length);
+
+ }
+
private void assertEmptyDLQ() throws Exception {
try {
DestinationViewMBean destinationView = getProxyToDestination(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));
[2/2] activemq git commit: AMQ-7015 This closes #290
Posted by jg...@apache.org.
AMQ-7015 This closes #290
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/63779e2f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/63779e2f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/63779e2f
Branch: refs/heads/activemq-5.15.x
Commit: 63779e2f78001ffbbb490e5b00bf83134be58756
Parents: ce7498c
Author: Jeff Genender <jg...@savoirtech.com>
Authored: Thu Jul 19 12:00:02 2018 -0600
Committer: Jeff Genender <jg...@savoirtech.com>
Committed: Thu Jul 19 12:00:02 2018 -0600
----------------------------------------------------------------------
----------------------------------------------------------------------