You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2013/08/07 16:36:40 UTC

svn commit: r1511333 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/region/ activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/ activemq-unit-tests/src/test/java/org/apache/activemq/bugs/ activemq-un...

Author: dejanb
Date: Wed Aug  7 14:36:40 2013
New Revision: 1511333

URL: http://svn.apache.org/r1511333
Log:
https://issues.apache.org/jira/browse/AMQ-4656 - first stab at improving keepDurableSubsActive feature, by not stoping/starting cursor on subscription (de)activating

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubSelectorDelayWithRestartTest.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1511333&r1=1511332&r2=1511333&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Wed Aug  7 14:36:40 2013
@@ -162,17 +162,19 @@ public class DurableTopicSubscription ex
             }
 
             synchronized (pendingLock) {
-                pending.setSystemUsage(memoryManager);
-                pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
-                pending.setMaxAuditDepth(getMaxAuditDepth());
-                pending.setMaxProducersToAudit(getMaxProducersToAudit());
-                pending.start();
-                // If nothing was in the persistent store, then try to use the
-                // recovery policy.
-                if (pending.isEmpty()) {
-                    for (Destination destination : durableDestinations.values()) {
-                        Topic topic = (Topic) destination;
-                        topic.recoverRetroactiveMessages(context, this);
+                if (!((StoreDurableSubscriberCursor) pending).isStarted() || !keepDurableSubsActive) {
+                    pending.setSystemUsage(memoryManager);
+                    pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
+                    pending.setMaxAuditDepth(getMaxAuditDepth());
+                    pending.setMaxProducersToAudit(getMaxProducersToAudit());
+                    pending.start();
+                    // If nothing was in the persistent store, then try to use the
+                    // recovery policy.
+                    if (pending.isEmpty()) {
+                        for (Destination destination : durableDestinations.values()) {
+                            Topic topic = (Topic) destination;
+                            topic.recoverRetroactiveMessages(context, this);
+                        }
                     }
                 }
             }
@@ -195,7 +197,9 @@ public class DurableTopicSubscription ex
         List<MessageReference> savedDispateched = null;
 
         synchronized (pendingLock) {
-            pending.stop();
+            if (!keepDurableSubsActive) {
+                pending.stop();
+            }
 
             synchronized (dispatchLock) {
                 for (Destination destination : durableDestinations.values()) {

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=1511333&r1=1511332&r2=1511333&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Wed Aug  7 14:36:40 2013
@@ -309,7 +309,7 @@ public abstract class AbstractPendingMes
         }
     }
     
-    protected synchronized boolean isStarted() {
+    public synchronized boolean isStarted() {
         return started;
     }
     

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=1511333&r1=1511332&r2=1511333&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Wed Aug  7 14:36:40 2013
@@ -211,7 +211,7 @@ public abstract class AbstractStoreCurso
     }
 
     
-    public final synchronized void addMessageFirst(MessageReference node) throws Exception {
+    public synchronized void addMessageFirst(MessageReference node) throws Exception {
         setCacheEnabled(false);
         size++;
     }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=1511333&r1=1511332&r2=1511333&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Wed Aug  7 14:36:40 2013
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.region.cursors;
 
+import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.command.Message;
@@ -59,6 +60,11 @@ class TopicStorePrefetch extends Abstrac
         // shouldn't get called
         throw new RuntimeException("Not supported");
     }
+
+    public synchronized void addMessageFirst(MessageReference node) throws Exception {
+        batchList.addMessageFirst(node);
+        size++;
+    }
     
         
     @Override

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java?rev=1511333&r1=1511332&r2=1511333&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java Wed Aug  7 14:36:40 2013
@@ -213,6 +213,7 @@ public class DurableConsumerTest extends
     public void testConcurrentDurableConsumer() throws Exception{
         
         broker.start();
+        broker.waitUntilStarted();
         
         factory = createConnectionFactory();
         final String topicName = getName();
@@ -253,7 +254,7 @@ public class DurableConsumerTest extends
                                 }
                             }
                         } while (msg == null);
-                        
+
                         consumerConnection.close();
                     }
                     assertTrue(received >= acked);
@@ -408,6 +409,7 @@ public class DurableConsumerTest extends
         super.tearDown();
         if (broker != null) {
             broker.stop();
+            broker.waitUntilStopped();
             broker = null;
         }
     }

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubSelectorDelayWithRestartTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubSelectorDelayWithRestartTest.java?rev=1511333&r1=1511332&r2=1511333&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubSelectorDelayWithRestartTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubSelectorDelayWithRestartTest.java Wed Aug  7 14:36:40 2013
@@ -225,8 +225,10 @@ public class DurableSubSelectorDelayWith
                 } while (true);
 
             } finally {
-                sess.close();
-                con.close();
+                try {
+                    sess.close();
+                    con.close();
+                } catch (Exception e) {}
 
                 LOG.info(toString() + " OFFLINE.");
             }

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1511333&r1=1511332&r2=1511333&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java Wed Aug  7 14:36:40 2013
@@ -130,6 +130,7 @@ public class DurableSubscriptionOfflineT
             ((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).setJournalMaxFileLength(journalMaxFileLength);
         }
         broker.start();
+        broker.waitUntilStarted();
     }
 
     private void destroyBroker() throws Exception {
@@ -1120,6 +1121,7 @@ public class DurableSubscriptionOfflineT
             LOG.info("Iteration: " + i);
             doTestOrderOnActivateDeactivate();
             broker.stop();
+            broker.waitUntilStopped();
             createBroker(true /*deleteAllMessages*/);
         }
     }