You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2015/05/20 14:43:37 UTC

[8/8] activemq git commit: https://issues.apache.org/jira/browse/AMQ-4842 - the store needed to be shared between master and slave

https://issues.apache.org/jira/browse/AMQ-4842 - the store needed to be shared between master and slave


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c07d6c84
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c07d6c84
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c07d6c84

Branch: refs/heads/master
Commit: c07d6c841dce765d58fc97139d3e76873a257b59
Parents: 9ae5b41
Author: gtully <ga...@gmail.com>
Authored: Wed May 20 13:42:49 2015 +0100
Committer: gtully <ga...@gmail.com>
Committed: Wed May 20 13:42:49 2015 +0100

----------------------------------------------------------------------
 .../broker/ft/QueueMasterSlaveTestSupport.java  | 27 ++++++++++++++------
 .../ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java |  9 ++++++-
 2 files changed, 27 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/c07d6c84/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestSupport.java
index a7a525e..570b5f1 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestSupport.java
@@ -31,6 +31,7 @@ import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
 import org.apache.activemq.xbean.BrokerFactoryBean;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -134,28 +135,38 @@ abstract public class QueueMasterSlaveTestSupport extends JmsTopicSendReceiveWit
         assertTrue("slave started", slaveStarted.await(60, TimeUnit.SECONDS));
         assertTrue(!slave.get().isSlave());
 
+        LOG.info("Sending post failover message to VT");
+
         final String text = "ForUWhenSlaveKicksIn";
         producer.send(new ActiveMQTopic("VirtualTopic.TA1"), session.createTextMessage(text));
 
+        // dest must survive failover - consumer created after send
         qConsumer = session.createConsumer(new ActiveMQQueue("Consumer.A.VirtualTopic.TA1"));
 
-        javax.jms.Message message = qConsumer.receive(20000);
+        javax.jms.Message message = qConsumer.receive(10000);
         assertNotNull("Get message after failover", message);
         assertEquals("correct message", text, ((TextMessage)message).getText());
     }
 
     public void testAdvisory() throws Exception {
-        MessageConsumer advConsumer = session.createConsumer(AdvisorySupport.getMasterBrokerAdvisoryTopic());
-        Message advisoryMessage = advConsumer.receive(5000);
-        LOG.info("received " + advisoryMessage);
-        assertNotNull("Didn't received advisory", advisoryMessage);
+        final MessageConsumer advConsumer = session.createConsumer(AdvisorySupport.getMasterBrokerAdvisoryTopic());
+        final Message[] advisoryMessage = new Message[1];
+        advisoryMessage[0] = advConsumer.receive(5000);
+        LOG.info("received " + advisoryMessage[0]);
+        assertNotNull("Didn't received advisory", advisoryMessage[0]);
 
         master.stop();
         assertTrue("slave started", slaveStarted.await(60, TimeUnit.SECONDS));
         LOG.info("slave started");
-        advisoryMessage = advConsumer.receive(20000);
-        LOG.info("received " + advisoryMessage);
-        assertNotNull("Didn't received advisory", advisoryMessage);
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                advisoryMessage[0] = advConsumer.receive(500);
+                return advisoryMessage[0] != null;
+            }
+        });
+        LOG.info("received " + advisoryMessage[0]);
+        assertNotNull("Didn't received advisory", advisoryMessage[0]);
 
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/c07d6c84/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java
index 4cfb5e3..c14506f 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.ft;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import javax.sql.DataSource;
@@ -33,11 +34,13 @@ public class kahaDbJdbcLeaseQueueMasterSlaveTest extends QueueMasterSlaveTestSup
     protected DataSource sharedDs;
     protected String MASTER_URL = "tcp://localhost:62001";
     protected String SLAVE_URL  = "tcp://localhost:62002";
+    File sharedDbDirFile;
 
     @Override
     protected void setUp() throws Exception {
         // startup db
         sharedDs = new SyncCreateDataSource((EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()));
+        sharedDbDirFile = new File(new File(IOHelper.getDefaultDataDirectory()), "sharedKahaDB");
         super.setUp();
     }
 
@@ -47,6 +50,7 @@ public class kahaDbJdbcLeaseQueueMasterSlaveTest extends QueueMasterSlaveTestSup
         DataSourceServiceSupport.shutdownDefaultDataSource(((SyncCreateDataSource)sharedDs).getDelegate());
     }
 
+    @Override
     protected void createMaster() throws Exception {
         master = new BrokerService();
         master.setBrokerName("master");
@@ -55,6 +59,7 @@ public class kahaDbJdbcLeaseQueueMasterSlaveTest extends QueueMasterSlaveTestSup
         master.setPersistent(true);
         master.setDeleteAllMessagesOnStartup(true);
         KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) master.getPersistenceAdapter();
+        kahaDBPersistenceAdapter.setDirectory(sharedDbDirFile);
         LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
         leaseDatabaseLocker.setCreateTablesOnStartup(true);
         leaseDatabaseLocker.setDataSource(getExistingDataSource());
@@ -63,6 +68,7 @@ public class kahaDbJdbcLeaseQueueMasterSlaveTest extends QueueMasterSlaveTestSup
         configureLocker(kahaDBPersistenceAdapter);
         configureBroker(master);
         master.start();
+        master.waitUntilStarted();
     }
 
     protected void configureBroker(BrokerService brokerService) {
@@ -86,14 +92,15 @@ public class kahaDbJdbcLeaseQueueMasterSlaveTest extends QueueMasterSlaveTestSup
                     broker.setUseJmx(false);
                     broker.setPersistent(true);
                     KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+                    kahaDBPersistenceAdapter.setDirectory(sharedDbDirFile);
                     LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
                     leaseDatabaseLocker.setDataSource(getExistingDataSource());
                     leaseDatabaseLocker.setStatements(new Statements());
                     kahaDBPersistenceAdapter.setLocker(leaseDatabaseLocker);
                     configureLocker(kahaDBPersistenceAdapter);
                     configureBroker(broker);
-                    broker.start();
                     slave.set(broker);
+                    broker.start();
                     slaveStarted.countDown();
                 } catch (IllegalStateException expectedOnShutdown) {
                 } catch (Exception e) {