You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/07/11 01:11:37 UTC
svn commit: r1359949 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/Topic.java
test/java/org/apache/activemq/usecases/DurableSubscriptionActivationTest.java
Author: tabish
Date: Tue Jul 10 23:11:36 2012
New Revision: 1359949
URL: http://svn.apache.org/viewvc?rev=1359949&view=rev
Log:
fix and test fo:r https://issues.apache.org/jira/browse/AMQ-3921
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionActivationTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1359949&r1=1359948&r2=1359949&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Tue Jul 10 23:11:36 2012
@@ -222,7 +222,9 @@ public class Topic extends BaseDestinati
info = null;
} else {
synchronized (consumers) {
- consumers.add(subscription);
+ if (!consumers.contains(subscription)) {
+ consumers.add(subscription);
+ }
}
}
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionActivationTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionActivationTest.java?rev=1359949&r1=1359948&r2=1359949&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionActivationTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionActivationTest.java Tue Jul 10 23:11:36 2012
@@ -27,7 +27,7 @@ import org.apache.activemq.broker.Broker
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
public class DurableSubscriptionActivationTest extends org.apache.activemq.TestSupport {
@@ -47,7 +47,7 @@ public class DurableSubscriptionActivati
protected void setUp() throws Exception {
topic = (ActiveMQTopic) createDestination();
- createBroker();
+ createBroker(true);
super.setUp();
}
@@ -58,16 +58,18 @@ public class DurableSubscriptionActivati
protected void restartBroker() throws Exception {
destroyBroker();
- createBroker();
+ createBroker(false);
}
- private void createBroker() throws Exception {
+ private void createBroker(boolean delete) throws Exception {
broker = BrokerFactory.createBroker("broker:(vm://localhost)");
broker.setKeepDurableSubsActive(true);
broker.setPersistent(true);
- AMQPersistenceAdapter persistenceAdapter = new AMQPersistenceAdapter();
- persistenceAdapter.setDirectory(new File("activemq-data/" + getName()));
- broker.setPersistenceAdapter(persistenceAdapter);
+ broker.setDeleteAllMessagesOnStartup(delete);
+ KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();
+ kahadb.setDirectory(new File("activemq-data/" + getName() + "-kahadb"));
+ kahadb.setJournalMaxFileLength(500 * 1024);
+ broker.setPersistenceAdapter(kahadb);
broker.setBrokerName(getName());
// only if we pre-create the destinations
@@ -85,14 +87,37 @@ public class DurableSubscriptionActivati
broker.stop();
}
- public void testActivateWithExistingTopic() throws Exception {
+ public void testActivateWithExistingTopic1() throws Exception {
// create durable subscription
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId");
Destination d = broker.getDestination(topic);
- assertTrue("More than one consumer.", d.getConsumers().size() == 1);
-
+ assertTrue("More than one consumer found: " + d.getConsumers().size(), d.getConsumers().size() == 1);
+
+ // restart the broker
+ restartBroker();
+
+ // activate
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId");
+
+ assertTrue("More than one consumer found: " + d.getConsumers().size(), d.getConsumers().size() == 1);
+
+ // re-activate
+ connection.close();
+ connection = createConnection();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId");
+
+ assertTrue("More than one consumer found: " + d.getConsumers().size(), d.getConsumers().size() == 1);
+ }
+
+ public void testActivateWithExistingTopic2() throws Exception {
+ // create durable subscription
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId");
+
// restart the broker
restartBroker();
@@ -100,7 +125,8 @@ public class DurableSubscriptionActivati
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId");
- assertTrue("More than one consumer.", d.getConsumers().size() == 1);
+ Destination d = broker.getDestination(topic);
+ assertTrue("More than one consumer found: " + d.getConsumers().size(), d.getConsumers().size() == 1);
// re-activate
connection.close();
@@ -108,6 +134,6 @@ public class DurableSubscriptionActivati
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId");
- assertTrue("More than one consumer.", d.getConsumers().size() == 1);
+ assertTrue("More than one consumer found: " + d.getConsumers().size(), d.getConsumers().size() == 1);
}
}