You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2012/10/17 17:16:01 UTC

svn commit: r1399302 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/Topic.java test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java

Author: dejanb
Date: Wed Oct 17 15:16:01 2012
New Revision: 1399302

URL: http://svn.apache.org/viewvc?rev=1399302&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-4108 - master broker advisory topic needs to be retroactive

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1399302&r1=1399301&r2=1399302&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Wed Oct 17 15:16:01 2012
@@ -27,10 +27,12 @@ import java.util.concurrent.CopyOnWriteA
 import java.util.concurrent.Future;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.policy.DispatchPolicy;
+import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
 import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
 import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
 import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
@@ -84,7 +86,12 @@ public class Topic extends BaseDestinati
         super(brokerService, store, destination, parentStats);
         this.topicStore = store;
         // set default subscription recovery policy
-        subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy();
+        if (AdvisorySupport.isMasterBrokerAdvisoryTopic(destination)) {
+            subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy();
+            setAlwaysRetroactive(true);
+        } else {
+            subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy();
+        }
         this.taskRunner = taskFactory.createTaskRunner(this, "Topic  " + destination.getPhysicalName());
     }
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java?rev=1399302&r1=1399301&r2=1399302&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java Wed Oct 17 15:16:01 2012
@@ -21,11 +21,13 @@ import java.util.concurrent.CountDownLat
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.TextMessage;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest;
+import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -132,4 +134,14 @@ public class QueueMasterSlaveTest extend
         assertNotNull("Get message after failover", message);
         assertEquals("correct message", text, ((TextMessage)message).getText());
     }
+
+    public void testAdvisory() throws Exception {
+        MessageConsumer advConsumer = session.createConsumer(AdvisorySupport.getMasterBrokerAdvisoryTopic());
+
+        master.stop();
+        assertTrue("slave started", slaveStarted.await(15, TimeUnit.SECONDS));
+        Message advisoryMessage = advConsumer.receive(5000);
+        assertNotNull("Didn't received advisory", advisoryMessage);
+
+    }
 }