You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2012/10/17 17:16:01 UTC
svn commit: r1399302 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/Topic.java
test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java
Author: dejanb
Date: Wed Oct 17 15:16:01 2012
New Revision: 1399302
URL: http://svn.apache.org/viewvc?rev=1399302&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-4108 - master broker advisory topic needs to be retroactive
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1399302&r1=1399301&r2=1399302&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Wed Oct 17 15:16:01 2012
@@ -27,10 +27,12 @@ import java.util.concurrent.CopyOnWriteA
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
+import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
@@ -84,7 +86,12 @@ public class Topic extends BaseDestinati
super(brokerService, store, destination, parentStats);
this.topicStore = store;
// set default subscription recovery policy
- subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy();
+ if (AdvisorySupport.isMasterBrokerAdvisoryTopic(destination)) {
+ subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy();
+ setAlwaysRetroactive(true);
+ } else {
+ subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy();
+ }
this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName());
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java?rev=1399302&r1=1399301&r2=1399302&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java Wed Oct 17 15:16:01 2012
@@ -21,11 +21,13 @@ import java.util.concurrent.CountDownLat
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest;
+import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
@@ -132,4 +134,14 @@ public class QueueMasterSlaveTest extend
assertNotNull("Get message after failover", message);
assertEquals("correct message", text, ((TextMessage)message).getText());
}
+
+ public void testAdvisory() throws Exception {
+ MessageConsumer advConsumer = session.createConsumer(AdvisorySupport.getMasterBrokerAdvisoryTopic());
+
+ master.stop();
+ assertTrue("slave started", slaveStarted.await(15, TimeUnit.SECONDS));
+ Message advisoryMessage = advConsumer.receive(5000);
+ assertNotNull("Didn't received advisory", advisoryMessage);
+
+ }
}