You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/07/11 01:11:37 UTC

svn commit: r1359949 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/Topic.java test/java/org/apache/activemq/usecases/DurableSubscriptionActivationTest.java

Author: tabish
Date: Tue Jul 10 23:11:36 2012
New Revision: 1359949

URL: http://svn.apache.org/viewvc?rev=1359949&view=rev
Log:
fix and test fo:r https://issues.apache.org/jira/browse/AMQ-3921

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionActivationTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1359949&r1=1359948&r2=1359949&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Tue Jul 10 23:11:36 2012
@@ -222,7 +222,9 @@ public class Topic extends BaseDestinati
                     info = null;
                 } else {
                     synchronized (consumers) {
-                        consumers.add(subscription);
+                    	if (!consumers.contains(subscription)) {
+                    		consumers.add(subscription);
+                    	}
                     }
                 }
             }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionActivationTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionActivationTest.java?rev=1359949&r1=1359948&r2=1359949&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionActivationTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionActivationTest.java Tue Jul 10 23:11:36 2012
@@ -27,7 +27,7 @@ import org.apache.activemq.broker.Broker
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 
 public class DurableSubscriptionActivationTest extends org.apache.activemq.TestSupport {
 
@@ -47,7 +47,7 @@ public class DurableSubscriptionActivati
 
     protected void setUp() throws Exception {
         topic = (ActiveMQTopic) createDestination();
-        createBroker();
+        createBroker(true);
         super.setUp();
     }
 
@@ -58,16 +58,18 @@ public class DurableSubscriptionActivati
 
     protected void restartBroker() throws Exception {
         destroyBroker();
-        createBroker();
+        createBroker(false);
     }
 
-    private void createBroker() throws Exception {
+    private void createBroker(boolean delete) throws Exception {
         broker = BrokerFactory.createBroker("broker:(vm://localhost)");
         broker.setKeepDurableSubsActive(true);
         broker.setPersistent(true);
-        AMQPersistenceAdapter persistenceAdapter = new AMQPersistenceAdapter();
-        persistenceAdapter.setDirectory(new File("activemq-data/" + getName()));
-        broker.setPersistenceAdapter(persistenceAdapter);
+        broker.setDeleteAllMessagesOnStartup(delete);
+        KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();
+        kahadb.setDirectory(new File("activemq-data/" + getName() + "-kahadb"));
+        kahadb.setJournalMaxFileLength(500 * 1024);
+        broker.setPersistenceAdapter(kahadb);
         broker.setBrokerName(getName());
 
         // only if we pre-create the destinations
@@ -85,14 +87,37 @@ public class DurableSubscriptionActivati
             broker.stop();
     }
 
-    public void testActivateWithExistingTopic() throws Exception {
+    public void testActivateWithExistingTopic1() throws Exception {
         // create durable subscription
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         session.createDurableSubscriber(topic, "SubsId");
 
         Destination d = broker.getDestination(topic);
-        assertTrue("More than one consumer.", d.getConsumers().size() == 1);
-        
+        assertTrue("More than one consumer found: " + d.getConsumers().size(), d.getConsumers().size() == 1);
+
+        // restart the broker
+        restartBroker();
+
+        // activate
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId");
+
+        assertTrue("More than one consumer found: " + d.getConsumers().size(), d.getConsumers().size() == 1);
+
+        // re-activate
+        connection.close();
+        connection = createConnection();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId");
+
+        assertTrue("More than one consumer found: " + d.getConsumers().size(), d.getConsumers().size() == 1);
+    }
+
+    public void testActivateWithExistingTopic2() throws Exception {
+        // create durable subscription
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId");
+
         // restart the broker
         restartBroker();
 
@@ -100,7 +125,8 @@ public class DurableSubscriptionActivati
         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         session.createDurableSubscriber(topic, "SubsId");
 
-        assertTrue("More than one consumer.", d.getConsumers().size() == 1);
+        Destination d = broker.getDestination(topic);
+        assertTrue("More than one consumer found: " + d.getConsumers().size(), d.getConsumers().size() == 1);
 
         // re-activate
         connection.close();
@@ -108,6 +134,6 @@ public class DurableSubscriptionActivati
         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         session.createDurableSubscriber(topic, "SubsId");
 
-        assertTrue("More than one consumer.", d.getConsumers().size() == 1);
+        assertTrue("More than one consumer found: " + d.getConsumers().size(), d.getConsumers().size() == 1);
     }
 }