You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2018/09/18 09:39:24 UTC

activemq git commit: AMQ-7052 - Fix JdbcXARecoveryBrokerTest and mLevelDBXARecoveryBrokerTest tests

Repository: activemq
Updated Branches:
  refs/heads/master e1e33e7ae -> b92aaa2f5


AMQ-7052 - Fix JdbcXARecoveryBrokerTest and mLevelDBXARecoveryBrokerTest tests

Signed-off-by: gtully <ga...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b92aaa2f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b92aaa2f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b92aaa2f

Branch: refs/heads/master
Commit: b92aaa2f583e1ef92bd84678701ced43c1d1d9a9
Parents: e1e33e7
Author: Alan Protasio <al...@gmail.com>
Authored: Fri Sep 14 01:39:17 2018 -0700
Committer: gtully <ga...@gmail.com>
Committed: Tue Sep 18 10:26:29 2018 +0100

----------------------------------------------------------------------
 activemq-unit-tests/pom.xml                     |   1 +
 .../broker/KahaDBXARecoveryBrokerTest.java      | 172 +++++++++++++++++++
 .../activemq/broker/XARecoveryBrokerTest.java   | 134 ---------------
 .../org/apache/activemq/bugs/AMQ6463Test.java   |  18 +-
 4 files changed, 183 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/b92aaa2f/activemq-unit-tests/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml
index 2952966..6ef398d 100644
--- a/activemq-unit-tests/pom.xml
+++ b/activemq-unit-tests/pom.xml
@@ -770,6 +770,7 @@
                 <exclude>org/apache/activemq/broker/jmx/MBeanTest.*</exclude>
                 <exclude>org/apache/activemq/broker/jmx/PurgeTest.*</exclude>
                 <exclude>org/apache/activemq/broker/mKahaDBXARecoveryBrokerTest.*</exclude>
+                <exclude>org/apache/activemq/broker/KahaDBXARecoveryBrokerTest.*</exclude>
                 <exclude>org/apache/activemq/broker/policy/AbortSlowConsumerTest.*</exclude>
                 <exclude>org/apache/activemq/broker/region/DestinationGCTest.*</exclude>
                 <exclude>org/apache/activemq/broker/region/DestinationRemoveRestartTest.*</exclude>

http://git-wip-us.apache.org/repos/asf/activemq/blob/b92aaa2f/activemq-unit-tests/src/test/java/org/apache/activemq/broker/KahaDBXARecoveryBrokerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/KahaDBXARecoveryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/KahaDBXARecoveryBrokerTest.java
new file mode 100644
index 0000000..2c4588e
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/KahaDBXARecoveryBrokerTest.java
@@ -0,0 +1,172 @@
+package org.apache.activemq.broker;
+
+import junit.framework.Test;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DataArrayResponse;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+
+public class KahaDBXARecoveryBrokerTest  extends XARecoveryBrokerTest {
+
+    @Override
+    protected void configureBroker(BrokerService broker) throws Exception {
+        super.configureBroker(broker);
+
+        KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
+        broker.setPersistenceAdapter(persistenceAdapter);
+    }
+
+    public static Test suite() {
+        return suite(KahaDBXARecoveryBrokerTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+    protected ActiveMQDestination createDestination() {
+        return new ActiveMQQueue("test");
+    }
+
+    public void testPreparedTransactionRecoveredPurgeCommitOnRestart() throws Exception {
+
+        ActiveMQDestination destination = createDestination();
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
+        // Prepare 4 message sends.
+        for (int i = 0; i < 4; i++) {
+            // Begin the transaction.
+            XATransactionId txid = createXATransaction(sessionInfo);
+            connection.send(createBeginTransaction(connectionInfo, txid));
+
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            message.setTransactionId(txid);
+            connection.send(message);
+
+            // Prepare
+            connection.send(createPrepareTransaction(connectionInfo, txid));
+        }
+
+        // Since prepared but not committed.. they should not get delivered.
+        assertNull(receiveMessage(connection));
+        assertNoMessagesLeft(connection);
+        connection.request(closeConnectionInfo(connectionInfo));
+
+        // restart the broker.
+        stopBroker();
+        if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
+            KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
+            adapter.setPurgeRecoveredXATransactionStrategy("COMMIT");
+            LOG.info("Setting purgeRecoveredXATransactions to true on the KahaDBPersistenceAdapter");
+        }
+        broker.start();
+
+        // Setup the consumer and try receive the message.
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
+        // Since committed ... they should get delivered.
+        for (int i = 0; i < 4; i++) {
+            assertNotNull(receiveMessage(connection));
+        }
+        assertNoMessagesLeft(connection);
+
+        Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
+        assertNotNull(response);
+        DataArrayResponse dar = (DataArrayResponse)response;
+
+        //These should be purged so expect 0
+        assertEquals(0, dar.getData().length);
+
+    }
+
+    public void testPreparedTransactionRecoveredPurgeRollbackOnRestart() throws Exception {
+
+        ActiveMQDestination destination = createDestination();
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
+        // Prepare 4 message sends.
+        for (int i = 0; i < 4; i++) {
+            // Begin the transaction.
+            XATransactionId txid = createXATransaction(sessionInfo);
+            connection.send(createBeginTransaction(connectionInfo, txid));
+
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            message.setTransactionId(txid);
+            connection.send(message);
+
+            // Prepare
+            connection.send(createPrepareTransaction(connectionInfo, txid));
+        }
+
+        // Since prepared but not committed.. they should not get delivered.
+        assertNull(receiveMessage(connection));
+        assertNoMessagesLeft(connection);
+        connection.request(closeConnectionInfo(connectionInfo));
+
+        // restart the broker.
+        stopBroker();
+        if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
+            KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
+            adapter.setPurgeRecoveredXATransactionStrategy("ROLLBACK");
+            LOG.info("Setting purgeRecoveredXATransactions to true on the KahaDBPersistenceAdapter");
+        }
+        broker.start();
+
+        // Setup the consumer and try receive the message.
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
+        // Since rolledback but not committed.. they should not get delivered.
+        assertNull(receiveMessage(connection));
+        assertNoMessagesLeft(connection);
+
+        Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
+        assertNotNull(response);
+        DataArrayResponse dar = (DataArrayResponse)response;
+
+        //These should be purged so expect 0
+        assertEquals(0, dar.getData().length);
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/b92aaa2f/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
index c9154a3..6a8b3f4 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
@@ -267,140 +267,6 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
         assertEmptyDLQ();
     }
 
-    public void testPreparedTransactionRecoveredPurgeRollbackOnRestart() throws Exception {
-
-        ActiveMQDestination destination = createDestination();
-
-        // Setup the producer and send the message.
-        StubConnection connection = createConnection();
-        ConnectionInfo connectionInfo = createConnectionInfo();
-        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
-        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
-        connection.send(connectionInfo);
-        connection.send(sessionInfo);
-        connection.send(producerInfo);
-        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
-        connection.send(consumerInfo);
-
-        // Prepare 4 message sends.
-        for (int i = 0; i < 4; i++) {
-            // Begin the transaction.
-            XATransactionId txid = createXATransaction(sessionInfo);
-            connection.send(createBeginTransaction(connectionInfo, txid));
-
-            Message message = createMessage(producerInfo, destination);
-            message.setPersistent(true);
-            message.setTransactionId(txid);
-            connection.send(message);
-
-            // Prepare
-            connection.send(createPrepareTransaction(connectionInfo, txid));
-        }
-
-        // Since prepared but not committed.. they should not get delivered.
-        assertNull(receiveMessage(connection));
-        assertNoMessagesLeft(connection);
-        connection.request(closeConnectionInfo(connectionInfo));
-
-        // restart the broker.
-        stopBroker();
-        if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
-            KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
-            adapter.setPurgeRecoveredXATransactionStrategy("ROLLBACK");
-            LOG.info("Setting purgeRecoveredXATransactions to true on the KahaDBPersistenceAdapter");
-        }
-        broker.start();
-
-        // Setup the consumer and try receive the message.
-        connection = createConnection();
-        connectionInfo = createConnectionInfo();
-        sessionInfo = createSessionInfo(connectionInfo);
-        connection.send(connectionInfo);
-        connection.send(sessionInfo);
-        consumerInfo = createConsumerInfo(sessionInfo, destination);
-        connection.send(consumerInfo);
-
-        // Since rolledback but not committed.. they should not get delivered.
-        assertNull(receiveMessage(connection));
-        assertNoMessagesLeft(connection);
-
-        Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
-        assertNotNull(response);
-        DataArrayResponse dar = (DataArrayResponse)response;
-
-        //These should be purged so expect 0
-        assertEquals(0, dar.getData().length);
-
-    }
-
-    public void testPreparedTransactionRecoveredPurgeCommitOnRestart() throws Exception {
-
-        ActiveMQDestination destination = createDestination();
-
-        // Setup the producer and send the message.
-        StubConnection connection = createConnection();
-        ConnectionInfo connectionInfo = createConnectionInfo();
-        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
-        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
-        connection.send(connectionInfo);
-        connection.send(sessionInfo);
-        connection.send(producerInfo);
-        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
-        connection.send(consumerInfo);
-
-        // Prepare 4 message sends.
-        for (int i = 0; i < 4; i++) {
-            // Begin the transaction.
-            XATransactionId txid = createXATransaction(sessionInfo);
-            connection.send(createBeginTransaction(connectionInfo, txid));
-
-            Message message = createMessage(producerInfo, destination);
-            message.setPersistent(true);
-            message.setTransactionId(txid);
-            connection.send(message);
-
-            // Prepare
-            connection.send(createPrepareTransaction(connectionInfo, txid));
-        }
-
-        // Since prepared but not committed.. they should not get delivered.
-        assertNull(receiveMessage(connection));
-        assertNoMessagesLeft(connection);
-        connection.request(closeConnectionInfo(connectionInfo));
-
-        // restart the broker.
-        stopBroker();
-        if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
-            KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
-            adapter.setPurgeRecoveredXATransactionStrategy("COMMIT");
-            LOG.info("Setting purgeRecoveredXATransactions to true on the KahaDBPersistenceAdapter");
-        }
-        broker.start();
-
-        // Setup the consumer and try receive the message.
-        connection = createConnection();
-        connectionInfo = createConnectionInfo();
-        sessionInfo = createSessionInfo(connectionInfo);
-        connection.send(connectionInfo);
-        connection.send(sessionInfo);
-        consumerInfo = createConsumerInfo(sessionInfo, destination);
-        connection.send(consumerInfo);
-
-        // Since committed ... they should get delivered.
-        for (int i = 0; i < 4; i++) {
-            assertNotNull(receiveMessage(connection));
-        }
-        assertNoMessagesLeft(connection);
-
-        Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
-        assertNotNull(response);
-        DataArrayResponse dar = (DataArrayResponse)response;
-
-        //These should be purged so expect 0
-        assertEquals(0, dar.getData().length);
-
-    }
-
     private void assertEmptyDLQ() throws Exception {
         try {
             DestinationViewMBean destinationView = getProxyToDestination(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));

http://git-wip-us.apache.org/repos/asf/activemq/blob/b92aaa2f/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6463Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6463Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6463Test.java
index e73bd7c..2bbdb68 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6463Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6463Test.java
@@ -63,10 +63,10 @@ public class AMQ6463Test extends JmsTestSupport {
 
         TextMessage message = session.createTextMessage("test msg");
         final int numMessages = 20;
-        long time = 5;
+
         message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "* * * * *");
-        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
-        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 5);
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 0);
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 0);
         message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, numMessages - 1);
 
         producer.send(message);
@@ -78,14 +78,14 @@ public class AMQ6463Test extends JmsTestSupport {
             public boolean isSatisified() throws Exception {
                 return gotUsageBlocked.get();
             }
-        }));
+        }, 60000));
 
         MessageConsumer consumer = session.createConsumer(queueA);
         TextMessage msg;
         for (int idx = 0; idx < numMessages; ++idx) {
-        	msg = (TextMessage) consumer.receive(10000);
-        	assertNotNull("received: " + idx, msg);
-        	msg.acknowledge();
+            msg = (TextMessage) consumer.receive(10000);
+            assertNotNull("received: " + idx, msg);
+            msg.acknowledge();
         }
         assertTrue("no errors in the log", errors.get() == 0);
         assertTrue("got blocked message", gotUsageBlocked.get());
@@ -99,6 +99,8 @@ public class AMQ6463Test extends JmsTestSupport {
         service.setSchedulerSupport(true);
         service.setDeleteAllMessagesOnStartup(true);
 
+        service.getSystemUsage().getMemoryUsage().setLimit(512);
+
         // Setup a destination policy where it takes only 1 message at a time.
         PolicyMap policyMap = new PolicyMap();
         PolicyEntry policy = new PolicyEntry();
@@ -131,7 +133,7 @@ public class AMQ6463Test extends JmsTestSupport {
 
         super.setUp();
     }
-    
+
     protected void tearDown() throws Exception {
         org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getRootLogger();
         rootLogger.removeAppender(appender);