You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2015/05/20 14:43:37 UTC
[8/8] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-4842 - the store needed to be
shared between master and slave
https://issues.apache.org/jira/browse/AMQ-4842 - the store needed to be shared between master and slave
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c07d6c84
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c07d6c84
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c07d6c84
Branch: refs/heads/master
Commit: c07d6c841dce765d58fc97139d3e76873a257b59
Parents: 9ae5b41
Author: gtully <ga...@gmail.com>
Authored: Wed May 20 13:42:49 2015 +0100
Committer: gtully <ga...@gmail.com>
Committed: Wed May 20 13:42:49 2015 +0100
----------------------------------------------------------------------
.../broker/ft/QueueMasterSlaveTestSupport.java | 27 ++++++++++++++------
.../ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java | 9 ++++++-
2 files changed, 27 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/c07d6c84/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestSupport.java
index a7a525e..570b5f1 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestSupport.java
@@ -31,6 +31,7 @@ import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -134,28 +135,38 @@ abstract public class QueueMasterSlaveTestSupport extends JmsTopicSendReceiveWit
assertTrue("slave started", slaveStarted.await(60, TimeUnit.SECONDS));
assertTrue(!slave.get().isSlave());
+ LOG.info("Sending post failover message to VT");
+
final String text = "ForUWhenSlaveKicksIn";
producer.send(new ActiveMQTopic("VirtualTopic.TA1"), session.createTextMessage(text));
+ // dest must survive failover - consumer created after send
qConsumer = session.createConsumer(new ActiveMQQueue("Consumer.A.VirtualTopic.TA1"));
- javax.jms.Message message = qConsumer.receive(20000);
+ javax.jms.Message message = qConsumer.receive(10000);
assertNotNull("Get message after failover", message);
assertEquals("correct message", text, ((TextMessage)message).getText());
}
public void testAdvisory() throws Exception {
- MessageConsumer advConsumer = session.createConsumer(AdvisorySupport.getMasterBrokerAdvisoryTopic());
- Message advisoryMessage = advConsumer.receive(5000);
- LOG.info("received " + advisoryMessage);
- assertNotNull("Didn't received advisory", advisoryMessage);
+ final MessageConsumer advConsumer = session.createConsumer(AdvisorySupport.getMasterBrokerAdvisoryTopic());
+ final Message[] advisoryMessage = new Message[1];
+ advisoryMessage[0] = advConsumer.receive(5000);
+ LOG.info("received " + advisoryMessage[0]);
+ assertNotNull("Didn't received advisory", advisoryMessage[0]);
master.stop();
assertTrue("slave started", slaveStarted.await(60, TimeUnit.SECONDS));
LOG.info("slave started");
- advisoryMessage = advConsumer.receive(20000);
- LOG.info("received " + advisoryMessage);
- assertNotNull("Didn't received advisory", advisoryMessage);
+ Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ advisoryMessage[0] = advConsumer.receive(500);
+ return advisoryMessage[0] != null;
+ }
+ });
+ LOG.info("received " + advisoryMessage[0]);
+ assertNotNull("Didn't received advisory", advisoryMessage[0]);
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/c07d6c84/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java
index 4cfb5e3..c14506f 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.ft;
+import java.io.File;
import java.io.IOException;
import java.net.URI;
import javax.sql.DataSource;
@@ -33,11 +34,13 @@ public class kahaDbJdbcLeaseQueueMasterSlaveTest extends QueueMasterSlaveTestSup
protected DataSource sharedDs;
protected String MASTER_URL = "tcp://localhost:62001";
protected String SLAVE_URL = "tcp://localhost:62002";
+ File sharedDbDirFile;
@Override
protected void setUp() throws Exception {
// startup db
sharedDs = new SyncCreateDataSource((EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()));
+ sharedDbDirFile = new File(new File(IOHelper.getDefaultDataDirectory()), "sharedKahaDB");
super.setUp();
}
@@ -47,6 +50,7 @@ public class kahaDbJdbcLeaseQueueMasterSlaveTest extends QueueMasterSlaveTestSup
DataSourceServiceSupport.shutdownDefaultDataSource(((SyncCreateDataSource)sharedDs).getDelegate());
}
+ @Override
protected void createMaster() throws Exception {
master = new BrokerService();
master.setBrokerName("master");
@@ -55,6 +59,7 @@ public class kahaDbJdbcLeaseQueueMasterSlaveTest extends QueueMasterSlaveTestSup
master.setPersistent(true);
master.setDeleteAllMessagesOnStartup(true);
KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) master.getPersistenceAdapter();
+ kahaDBPersistenceAdapter.setDirectory(sharedDbDirFile);
LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
leaseDatabaseLocker.setCreateTablesOnStartup(true);
leaseDatabaseLocker.setDataSource(getExistingDataSource());
@@ -63,6 +68,7 @@ public class kahaDbJdbcLeaseQueueMasterSlaveTest extends QueueMasterSlaveTestSup
configureLocker(kahaDBPersistenceAdapter);
configureBroker(master);
master.start();
+ master.waitUntilStarted();
}
protected void configureBroker(BrokerService brokerService) {
@@ -86,14 +92,15 @@ public class kahaDbJdbcLeaseQueueMasterSlaveTest extends QueueMasterSlaveTestSup
broker.setUseJmx(false);
broker.setPersistent(true);
KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+ kahaDBPersistenceAdapter.setDirectory(sharedDbDirFile);
LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
leaseDatabaseLocker.setDataSource(getExistingDataSource());
leaseDatabaseLocker.setStatements(new Statements());
kahaDBPersistenceAdapter.setLocker(leaseDatabaseLocker);
configureLocker(kahaDBPersistenceAdapter);
configureBroker(broker);
- broker.start();
slave.set(broker);
+ broker.start();
slaveStarted.countDown();
} catch (IllegalStateException expectedOnShutdown) {
} catch (Exception e) {