You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2018/09/18 09:39:24 UTC
activemq git commit: AMQ-7052 - Fix JdbcXARecoveryBrokerTest and
mLevelDBXARecoveryBrokerTest tests
Repository: activemq
Updated Branches:
refs/heads/master e1e33e7ae -> b92aaa2f5
AMQ-7052 - Fix JdbcXARecoveryBrokerTest and mLevelDBXARecoveryBrokerTest tests
Signed-off-by: gtully <ga...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b92aaa2f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b92aaa2f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b92aaa2f
Branch: refs/heads/master
Commit: b92aaa2f583e1ef92bd84678701ced43c1d1d9a9
Parents: e1e33e7
Author: Alan Protasio <al...@gmail.com>
Authored: Fri Sep 14 01:39:17 2018 -0700
Committer: gtully <ga...@gmail.com>
Committed: Tue Sep 18 10:26:29 2018 +0100
----------------------------------------------------------------------
activemq-unit-tests/pom.xml | 1 +
.../broker/KahaDBXARecoveryBrokerTest.java | 172 +++++++++++++++++++
.../activemq/broker/XARecoveryBrokerTest.java | 134 ---------------
.../org/apache/activemq/bugs/AMQ6463Test.java | 18 +-
4 files changed, 183 insertions(+), 142 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/b92aaa2f/activemq-unit-tests/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml
index 2952966..6ef398d 100644
--- a/activemq-unit-tests/pom.xml
+++ b/activemq-unit-tests/pom.xml
@@ -770,6 +770,7 @@
<exclude>org/apache/activemq/broker/jmx/MBeanTest.*</exclude>
<exclude>org/apache/activemq/broker/jmx/PurgeTest.*</exclude>
<exclude>org/apache/activemq/broker/mKahaDBXARecoveryBrokerTest.*</exclude>
+ <exclude>org/apache/activemq/broker/KahaDBXARecoveryBrokerTest.*</exclude>
<exclude>org/apache/activemq/broker/policy/AbortSlowConsumerTest.*</exclude>
<exclude>org/apache/activemq/broker/region/DestinationGCTest.*</exclude>
<exclude>org/apache/activemq/broker/region/DestinationRemoveRestartTest.*</exclude>
http://git-wip-us.apache.org/repos/asf/activemq/blob/b92aaa2f/activemq-unit-tests/src/test/java/org/apache/activemq/broker/KahaDBXARecoveryBrokerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/KahaDBXARecoveryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/KahaDBXARecoveryBrokerTest.java
new file mode 100644
index 0000000..2c4588e
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/KahaDBXARecoveryBrokerTest.java
@@ -0,0 +1,172 @@
+package org.apache.activemq.broker;
+
+import junit.framework.Test;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DataArrayResponse;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+
+public class KahaDBXARecoveryBrokerTest extends XARecoveryBrokerTest {
+
+ @Override
+ protected void configureBroker(BrokerService broker) throws Exception {
+ super.configureBroker(broker);
+
+ KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
+ broker.setPersistenceAdapter(persistenceAdapter);
+ }
+
+ public static Test suite() {
+ return suite(KahaDBXARecoveryBrokerTest.class);
+ }
+
+ public static void main(String[] args) {
+ junit.textui.TestRunner.run(suite());
+ }
+
+ protected ActiveMQDestination createDestination() {
+ return new ActiveMQQueue("test");
+ }
+
+ public void testPreparedTransactionRecoveredPurgeCommitOnRestart() throws Exception {
+
+ ActiveMQDestination destination = createDestination();
+
+ // Setup the producer and send the message.
+ StubConnection connection = createConnection();
+ ConnectionInfo connectionInfo = createConnectionInfo();
+ SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ connection.send(producerInfo);
+ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+ connection.send(consumerInfo);
+
+ // Prepare 4 message sends.
+ for (int i = 0; i < 4; i++) {
+ // Begin the transaction.
+ XATransactionId txid = createXATransaction(sessionInfo);
+ connection.send(createBeginTransaction(connectionInfo, txid));
+
+ Message message = createMessage(producerInfo, destination);
+ message.setPersistent(true);
+ message.setTransactionId(txid);
+ connection.send(message);
+
+ // Prepare
+ connection.send(createPrepareTransaction(connectionInfo, txid));
+ }
+
+ // Since prepared but not committed.. they should not get delivered.
+ assertNull(receiveMessage(connection));
+ assertNoMessagesLeft(connection);
+ connection.request(closeConnectionInfo(connectionInfo));
+
+ // restart the broker.
+ stopBroker();
+ if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
+ KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
+ adapter.setPurgeRecoveredXATransactionStrategy("COMMIT");
+ LOG.info("Setting purgeRecoveredXATransactions to true on the KahaDBPersistenceAdapter");
+ }
+ broker.start();
+
+ // Setup the consumer and try receive the message.
+ connection = createConnection();
+ connectionInfo = createConnectionInfo();
+ sessionInfo = createSessionInfo(connectionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ consumerInfo = createConsumerInfo(sessionInfo, destination);
+ connection.send(consumerInfo);
+
+ // Since committed ... they should get delivered.
+ for (int i = 0; i < 4; i++) {
+ assertNotNull(receiveMessage(connection));
+ }
+ assertNoMessagesLeft(connection);
+
+ Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
+ assertNotNull(response);
+ DataArrayResponse dar = (DataArrayResponse)response;
+
+ //These should be purged so expect 0
+ assertEquals(0, dar.getData().length);
+
+ }
+
+ public void testPreparedTransactionRecoveredPurgeRollbackOnRestart() throws Exception {
+
+ ActiveMQDestination destination = createDestination();
+
+ // Setup the producer and send the message.
+ StubConnection connection = createConnection();
+ ConnectionInfo connectionInfo = createConnectionInfo();
+ SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ connection.send(producerInfo);
+ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+ connection.send(consumerInfo);
+
+ // Prepare 4 message sends.
+ for (int i = 0; i < 4; i++) {
+ // Begin the transaction.
+ XATransactionId txid = createXATransaction(sessionInfo);
+ connection.send(createBeginTransaction(connectionInfo, txid));
+
+ Message message = createMessage(producerInfo, destination);
+ message.setPersistent(true);
+ message.setTransactionId(txid);
+ connection.send(message);
+
+ // Prepare
+ connection.send(createPrepareTransaction(connectionInfo, txid));
+ }
+
+ // Since prepared but not committed.. they should not get delivered.
+ assertNull(receiveMessage(connection));
+ assertNoMessagesLeft(connection);
+ connection.request(closeConnectionInfo(connectionInfo));
+
+ // restart the broker.
+ stopBroker();
+ if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
+ KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
+ adapter.setPurgeRecoveredXATransactionStrategy("ROLLBACK");
+ LOG.info("Setting purgeRecoveredXATransactions to true on the KahaDBPersistenceAdapter");
+ }
+ broker.start();
+
+ // Setup the consumer and try receive the message.
+ connection = createConnection();
+ connectionInfo = createConnectionInfo();
+ sessionInfo = createSessionInfo(connectionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ consumerInfo = createConsumerInfo(sessionInfo, destination);
+ connection.send(consumerInfo);
+
+ // Since rolledback but not committed.. they should not get delivered.
+ assertNull(receiveMessage(connection));
+ assertNoMessagesLeft(connection);
+
+ Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
+ assertNotNull(response);
+ DataArrayResponse dar = (DataArrayResponse)response;
+
+ //These should be purged so expect 0
+ assertEquals(0, dar.getData().length);
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq/blob/b92aaa2f/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
index c9154a3..6a8b3f4 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
@@ -267,140 +267,6 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
assertEmptyDLQ();
}
- public void testPreparedTransactionRecoveredPurgeRollbackOnRestart() throws Exception {
-
- ActiveMQDestination destination = createDestination();
-
- // Setup the producer and send the message.
- StubConnection connection = createConnection();
- ConnectionInfo connectionInfo = createConnectionInfo();
- SessionInfo sessionInfo = createSessionInfo(connectionInfo);
- ProducerInfo producerInfo = createProducerInfo(sessionInfo);
- connection.send(connectionInfo);
- connection.send(sessionInfo);
- connection.send(producerInfo);
- ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
- connection.send(consumerInfo);
-
- // Prepare 4 message sends.
- for (int i = 0; i < 4; i++) {
- // Begin the transaction.
- XATransactionId txid = createXATransaction(sessionInfo);
- connection.send(createBeginTransaction(connectionInfo, txid));
-
- Message message = createMessage(producerInfo, destination);
- message.setPersistent(true);
- message.setTransactionId(txid);
- connection.send(message);
-
- // Prepare
- connection.send(createPrepareTransaction(connectionInfo, txid));
- }
-
- // Since prepared but not committed.. they should not get delivered.
- assertNull(receiveMessage(connection));
- assertNoMessagesLeft(connection);
- connection.request(closeConnectionInfo(connectionInfo));
-
- // restart the broker.
- stopBroker();
- if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
- KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
- adapter.setPurgeRecoveredXATransactionStrategy("ROLLBACK");
- LOG.info("Setting purgeRecoveredXATransactions to true on the KahaDBPersistenceAdapter");
- }
- broker.start();
-
- // Setup the consumer and try receive the message.
- connection = createConnection();
- connectionInfo = createConnectionInfo();
- sessionInfo = createSessionInfo(connectionInfo);
- connection.send(connectionInfo);
- connection.send(sessionInfo);
- consumerInfo = createConsumerInfo(sessionInfo, destination);
- connection.send(consumerInfo);
-
- // Since rolledback but not committed.. they should not get delivered.
- assertNull(receiveMessage(connection));
- assertNoMessagesLeft(connection);
-
- Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
- assertNotNull(response);
- DataArrayResponse dar = (DataArrayResponse)response;
-
- //These should be purged so expect 0
- assertEquals(0, dar.getData().length);
-
- }
-
- public void testPreparedTransactionRecoveredPurgeCommitOnRestart() throws Exception {
-
- ActiveMQDestination destination = createDestination();
-
- // Setup the producer and send the message.
- StubConnection connection = createConnection();
- ConnectionInfo connectionInfo = createConnectionInfo();
- SessionInfo sessionInfo = createSessionInfo(connectionInfo);
- ProducerInfo producerInfo = createProducerInfo(sessionInfo);
- connection.send(connectionInfo);
- connection.send(sessionInfo);
- connection.send(producerInfo);
- ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
- connection.send(consumerInfo);
-
- // Prepare 4 message sends.
- for (int i = 0; i < 4; i++) {
- // Begin the transaction.
- XATransactionId txid = createXATransaction(sessionInfo);
- connection.send(createBeginTransaction(connectionInfo, txid));
-
- Message message = createMessage(producerInfo, destination);
- message.setPersistent(true);
- message.setTransactionId(txid);
- connection.send(message);
-
- // Prepare
- connection.send(createPrepareTransaction(connectionInfo, txid));
- }
-
- // Since prepared but not committed.. they should not get delivered.
- assertNull(receiveMessage(connection));
- assertNoMessagesLeft(connection);
- connection.request(closeConnectionInfo(connectionInfo));
-
- // restart the broker.
- stopBroker();
- if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
- KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
- adapter.setPurgeRecoveredXATransactionStrategy("COMMIT");
- LOG.info("Setting purgeRecoveredXATransactions to true on the KahaDBPersistenceAdapter");
- }
- broker.start();
-
- // Setup the consumer and try receive the message.
- connection = createConnection();
- connectionInfo = createConnectionInfo();
- sessionInfo = createSessionInfo(connectionInfo);
- connection.send(connectionInfo);
- connection.send(sessionInfo);
- consumerInfo = createConsumerInfo(sessionInfo, destination);
- connection.send(consumerInfo);
-
- // Since committed ... they should get delivered.
- for (int i = 0; i < 4; i++) {
- assertNotNull(receiveMessage(connection));
- }
- assertNoMessagesLeft(connection);
-
- Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
- assertNotNull(response);
- DataArrayResponse dar = (DataArrayResponse)response;
-
- //These should be purged so expect 0
- assertEquals(0, dar.getData().length);
-
- }
-
private void assertEmptyDLQ() throws Exception {
try {
DestinationViewMBean destinationView = getProxyToDestination(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));
http://git-wip-us.apache.org/repos/asf/activemq/blob/b92aaa2f/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6463Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6463Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6463Test.java
index e73bd7c..2bbdb68 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6463Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6463Test.java
@@ -63,10 +63,10 @@ public class AMQ6463Test extends JmsTestSupport {
TextMessage message = session.createTextMessage("test msg");
final int numMessages = 20;
- long time = 5;
+
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "* * * * *");
- message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
- message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 5);
+ message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 0);
+ message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 0);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, numMessages - 1);
producer.send(message);
@@ -78,14 +78,14 @@ public class AMQ6463Test extends JmsTestSupport {
public boolean isSatisified() throws Exception {
return gotUsageBlocked.get();
}
- }));
+ }, 60000));
MessageConsumer consumer = session.createConsumer(queueA);
TextMessage msg;
for (int idx = 0; idx < numMessages; ++idx) {
- msg = (TextMessage) consumer.receive(10000);
- assertNotNull("received: " + idx, msg);
- msg.acknowledge();
+ msg = (TextMessage) consumer.receive(10000);
+ assertNotNull("received: " + idx, msg);
+ msg.acknowledge();
}
assertTrue("no errors in the log", errors.get() == 0);
assertTrue("got blocked message", gotUsageBlocked.get());
@@ -99,6 +99,8 @@ public class AMQ6463Test extends JmsTestSupport {
service.setSchedulerSupport(true);
service.setDeleteAllMessagesOnStartup(true);
+ service.getSystemUsage().getMemoryUsage().setLimit(512);
+
// Setup a destination policy where it takes only 1 message at a time.
PolicyMap policyMap = new PolicyMap();
PolicyEntry policy = new PolicyEntry();
@@ -131,7 +133,7 @@ public class AMQ6463Test extends JmsTestSupport {
super.setUp();
}
-
+
protected void tearDown() throws Exception {
org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getRootLogger();
rootLogger.removeAppender(appender);