You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2018/07/19 18:00:28 UTC

[1/2] activemq git commit: AMQ-7015 Added a purgeRecoveredXATransactions property on the KahaDB adaptor to purge prepared XA messages on recovery

Repository: activemq
Updated Branches:
  refs/heads/activemq-5.15.x 7313d72c6 -> 63779e2f7


AMQ-7015 Added a purgeRecoveredXATransactions property on the KahaDB adaptor to purge prepared XA messages on recovery


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ce7498c9
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ce7498c9
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ce7498c9

Branch: refs/heads/activemq-5.15.x
Commit: ce7498c971b99e2515f07aab36418a1a0f19c03e
Parents: 7313d72
Author: hkesler <hk...@contractor.cengage.com>
Authored: Thu Jul 19 11:53:04 2018 -0600
Committer: hkesler <hk...@contractor.cengage.com>
Committed: Thu Jul 19 11:53:04 2018 -0600

----------------------------------------------------------------------
 .../store/kahadb/KahaDBPersistenceAdapter.java  |  8 +++
 .../activemq/store/kahadb/MessageDatabase.java  | 16 +++++
 .../broker/BrokerRestartTestSupport.java        |  7 ++-
 .../activemq/broker/XARecoveryBrokerTest.java   | 66 ++++++++++++++++++++
 4 files changed, 95 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ce7498c9/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
index c4f480c..fa0f7c2 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
@@ -547,6 +547,14 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
         letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
     }
 
+    public boolean isPurgeRecoveredXATransactions() {
+        return letter.isPurgeRecoveredXATransactions();
+    }
+
+    public void setPurgeRecoveredXATransactions(boolean purgeRecoveredXATransactions) {
+        letter.setPurgeRecoveredXATransactions(purgeRecoveredXATransactions);
+    }
+
     @Override
     public void setBrokerService(BrokerService brokerService) {
         super.setBrokerService(brokerService);

http://git-wip-us.apache.org/repos/asf/activemq/blob/ce7498c9/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 94de6ea..76d0da0 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -272,6 +272,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
     private boolean ignoreMissingJournalfiles = false;
     private int indexCacheSize = 10000;
     private boolean checkForCorruptJournalFiles = false;
+    private boolean purgeRecoveredXATransactions = false;
     private boolean checksumJournalFiles = true;
     protected boolean forceRecoverIndex = false;
     private boolean archiveCorruptedIndex = false;
@@ -748,6 +749,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 for (TransactionId txId : preparedTransactions.keySet()) {
                     LOG.warn("Recovered prepared XA TX: [{}]", txId);
                 }
+
+                if (purgeRecoveredXATransactions){
+                    if (!preparedTransactions.isEmpty()){
+                        LOG.warn("Purging " +  preparedTransactions.size() + " recovered prepared XA TXs" );
+                        preparedTransactions.clear();
+                    }
+                }
             }
 
         } finally {
@@ -3340,6 +3348,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
     }
 
+    public boolean isPurgeRecoveredXATransactions() {
+        return purgeRecoveredXATransactions;
+    }
+
+    public void setPurgeRecoveredXATransactions(boolean purgeRecoveredXATransactions) {
+        this.purgeRecoveredXATransactions = purgeRecoveredXATransactions;
+    }
+
     public boolean isChecksumJournalFiles() {
         return checksumJournalFiles;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ce7498c9/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
index c4e3848..111494a 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
@@ -58,10 +58,13 @@ public class BrokerRestartTestSupport extends BrokerTestSupport {
      * @throws URISyntaxException
      */
     protected void restartBroker() throws Exception {
+        stopBroker();
+        broker.start();
+    }
+
+    protected void stopBroker() throws Exception {
         broker.stop();
         broker.waitUntilStopped();
         broker = createRestartedBroker();
-        broker.start();
     }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ce7498c9/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
index 9660ef0..b52681a 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
@@ -267,6 +267,72 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
         assertEmptyDLQ();
     }
 
+    public void testPreparedTransactionRecoveredPurgeOnRestart() throws Exception {
+
+        ActiveMQDestination destination = createDestination();
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
+        // Prepare 4 message sends.
+        for (int i = 0; i < 4; i++) {
+            // Begin the transaction.
+            XATransactionId txid = createXATransaction(sessionInfo);
+            connection.send(createBeginTransaction(connectionInfo, txid));
+
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            message.setTransactionId(txid);
+            connection.send(message);
+
+            // Prepare
+            connection.send(createPrepareTransaction(connectionInfo, txid));
+        }
+
+        // Since prepared but not committed.. they should not get delivered.
+        assertNull(receiveMessage(connection));
+        assertNoMessagesLeft(connection);
+        connection.request(closeConnectionInfo(connectionInfo));
+
+        // restart the broker.
+        stopBroker();
+        if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
+            KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
+            adapter.setPurgeRecoveredXATransactions(true);
+            LOG.info("Setting purgeRecoveredXATransactions to true on the KahaDBPersistenceAdapter");
+        }
+        broker.start();
+
+        // Setup the consumer and try receive the message.
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
+        // Since prepared but not committed.. they should not get delivered.
+        assertNull(receiveMessage(connection));
+        assertNoMessagesLeft(connection);
+
+        Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
+        assertNotNull(response);
+        DataArrayResponse dar = (DataArrayResponse)response;
+
+        //These should be purged so expect 0
+        assertEquals(0, dar.getData().length);
+
+    }
+
     private void assertEmptyDLQ() throws Exception {
         try {
             DestinationViewMBean destinationView = getProxyToDestination(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));


[2/2] activemq git commit: AMQ-7015 This closes #290

Posted by jg...@apache.org.
AMQ-7015 This closes #290


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/63779e2f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/63779e2f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/63779e2f

Branch: refs/heads/activemq-5.15.x
Commit: 63779e2f78001ffbbb490e5b00bf83134be58756
Parents: ce7498c
Author: Jeff Genender <jg...@savoirtech.com>
Authored: Thu Jul 19 12:00:02 2018 -0600
Committer: Jeff Genender <jg...@savoirtech.com>
Committed: Thu Jul 19 12:00:02 2018 -0600

----------------------------------------------------------------------

----------------------------------------------------------------------