You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2013/08/07 16:36:40 UTC
svn commit: r1511333 - in /activemq/trunk:
activemq-broker/src/main/java/org/apache/activemq/broker/region/
activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/
activemq-unit-tests/src/test/java/org/apache/activemq/bugs/ activemq-un...
Author: dejanb
Date: Wed Aug 7 14:36:40 2013
New Revision: 1511333
URL: http://svn.apache.org/r1511333
Log:
https://issues.apache.org/jira/browse/AMQ-4656 - first stab at improving keepDurableSubsActive feature, by not stoping/starting cursor on subscription (de)activating
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubSelectorDelayWithRestartTest.java
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1511333&r1=1511332&r2=1511333&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Wed Aug 7 14:36:40 2013
@@ -162,17 +162,19 @@ public class DurableTopicSubscription ex
}
synchronized (pendingLock) {
- pending.setSystemUsage(memoryManager);
- pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
- pending.setMaxAuditDepth(getMaxAuditDepth());
- pending.setMaxProducersToAudit(getMaxProducersToAudit());
- pending.start();
- // If nothing was in the persistent store, then try to use the
- // recovery policy.
- if (pending.isEmpty()) {
- for (Destination destination : durableDestinations.values()) {
- Topic topic = (Topic) destination;
- topic.recoverRetroactiveMessages(context, this);
+ if (!((StoreDurableSubscriberCursor) pending).isStarted() || !keepDurableSubsActive) {
+ pending.setSystemUsage(memoryManager);
+ pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
+ pending.setMaxAuditDepth(getMaxAuditDepth());
+ pending.setMaxProducersToAudit(getMaxProducersToAudit());
+ pending.start();
+ // If nothing was in the persistent store, then try to use the
+ // recovery policy.
+ if (pending.isEmpty()) {
+ for (Destination destination : durableDestinations.values()) {
+ Topic topic = (Topic) destination;
+ topic.recoverRetroactiveMessages(context, this);
+ }
}
}
}
@@ -195,7 +197,9 @@ public class DurableTopicSubscription ex
List<MessageReference> savedDispateched = null;
synchronized (pendingLock) {
- pending.stop();
+ if (!keepDurableSubsActive) {
+ pending.stop();
+ }
synchronized (dispatchLock) {
for (Destination destination : durableDestinations.values()) {
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=1511333&r1=1511332&r2=1511333&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Wed Aug 7 14:36:40 2013
@@ -309,7 +309,7 @@ public abstract class AbstractPendingMes
}
}
- protected synchronized boolean isStarted() {
+ public synchronized boolean isStarted() {
return started;
}
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=1511333&r1=1511332&r2=1511333&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Wed Aug 7 14:36:40 2013
@@ -211,7 +211,7 @@ public abstract class AbstractStoreCurso
}
- public final synchronized void addMessageFirst(MessageReference node) throws Exception {
+ public synchronized void addMessageFirst(MessageReference node) throws Exception {
setCacheEnabled(false);
size++;
}
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=1511333&r1=1511332&r2=1511333&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Wed Aug 7 14:36:40 2013
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.region.cursors;
+import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message;
@@ -59,6 +60,11 @@ class TopicStorePrefetch extends Abstrac
// shouldn't get called
throw new RuntimeException("Not supported");
}
+
+ public synchronized void addMessageFirst(MessageReference node) throws Exception {
+ batchList.addMessageFirst(node);
+ size++;
+ }
@Override
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java?rev=1511333&r1=1511332&r2=1511333&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java Wed Aug 7 14:36:40 2013
@@ -213,6 +213,7 @@ public class DurableConsumerTest extends
public void testConcurrentDurableConsumer() throws Exception{
broker.start();
+ broker.waitUntilStarted();
factory = createConnectionFactory();
final String topicName = getName();
@@ -253,7 +254,7 @@ public class DurableConsumerTest extends
}
}
} while (msg == null);
-
+
consumerConnection.close();
}
assertTrue(received >= acked);
@@ -408,6 +409,7 @@ public class DurableConsumerTest extends
super.tearDown();
if (broker != null) {
broker.stop();
+ broker.waitUntilStopped();
broker = null;
}
}
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubSelectorDelayWithRestartTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubSelectorDelayWithRestartTest.java?rev=1511333&r1=1511332&r2=1511333&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubSelectorDelayWithRestartTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubSelectorDelayWithRestartTest.java Wed Aug 7 14:36:40 2013
@@ -225,8 +225,10 @@ public class DurableSubSelectorDelayWith
} while (true);
} finally {
- sess.close();
- con.close();
+ try {
+ sess.close();
+ con.close();
+ } catch (Exception e) {}
LOG.info(toString() + " OFFLINE.");
}
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1511333&r1=1511332&r2=1511333&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java Wed Aug 7 14:36:40 2013
@@ -130,6 +130,7 @@ public class DurableSubscriptionOfflineT
((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).setJournalMaxFileLength(journalMaxFileLength);
}
broker.start();
+ broker.waitUntilStarted();
}
private void destroyBroker() throws Exception {
@@ -1120,6 +1121,7 @@ public class DurableSubscriptionOfflineT
LOG.info("Iteration: " + i);
doTestOrderOnActivateDeactivate();
broker.stop();
+ broker.waitUntilStopped();
createBroker(true /*deleteAllMessages*/);
}
}