You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/08/09 19:09:49 UTC
svn commit: r1155437 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/jmx/
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/broker/region/cursors/
test/java/org/apache/activemq/usecases/
Author: gtully
Date: Tue Aug 9 17:09:49 2011
New Revision: 1155437
URL: http://svn.apache.org/viewvc?rev=1155437&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3442: Use real durable sub key as MBean name for inactive durable sub and improve metrics, fix inflight count on deactivate. resolved.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=1155437&r1=1155436&r2=1155437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java Tue Aug 9 17:09:49 2011
@@ -392,7 +392,7 @@ public class DestinationView implements
int index = 0;
for (Subscription subscription : subscriptions) {
String connectionClientId = subscription.getContext().getClientId();
- String objectNameStr = ManagedRegionBroker.getSubscriptionObjectName(subscription, connectionClientId, objectName);
+ String objectNameStr = ManagedRegionBroker.getSubscriptionObjectName(subscription.getConsumerInfo(), connectionClientId, objectName);
answer[index++] = new ObjectName(objectNameStr);
}
return answer;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java?rev=1155437&r1=1155436&r2=1155437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java Tue Aug 9 17:09:49 2011
@@ -21,6 +21,8 @@ import javax.management.openmbean.OpenDa
import javax.management.openmbean.TabularData;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.SubscriptionInfo;
@@ -36,12 +38,12 @@ public class InactiveDurableSubscription
*
* @param broker
* @param clientId
- * @param sub
+ * @param subInfo
*/
- public InactiveDurableSubscriptionView(ManagedRegionBroker broker, String clientId, SubscriptionInfo sub) {
- super(broker,clientId, null);
+ public InactiveDurableSubscriptionView(ManagedRegionBroker broker, String clientId, SubscriptionInfo subInfo, Subscription subscription) {
+ super(broker,clientId, subscription);
this.broker = broker;
- this.subscriptionInfo = sub;
+ this.subscriptionInfo = subInfo;
}
/**
@@ -94,6 +96,12 @@ public class InactiveDurableSubscription
return false;
}
+ @Override
+ protected ConsumerInfo getConsumerInfo() {
+ // when inactive, consumer info is stale
+ return null;
+ }
+
/**
* Browse messages for this durable subscriber
*
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=1155437&r1=1155436&r2=1155437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Tue Aug 9 17:09:49 2011
@@ -185,7 +185,7 @@ public class ManagedRegionBroker extends
public ObjectName registerSubscription(ConnectionContext context, Subscription sub) {
String connectionClientId = context.getClientId();
ObjectName brokerJmxObjectName = brokerObjectName;
- String objectNameStr = getSubscriptionObjectName(sub, connectionClientId, brokerJmxObjectName);
+ String objectNameStr = getSubscriptionObjectName(sub.getConsumerInfo(), connectionClientId, brokerJmxObjectName);
SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName());
try {
ObjectName objectName = new ObjectName(objectNameStr);
@@ -196,7 +196,7 @@ public class ManagedRegionBroker extends
info.setClientId(context.getClientId());
info.setSubscriptionName(sub.getConsumerInfo().getSubscriptionName());
info.setDestination(sub.getConsumerInfo().getDestination());
- addInactiveSubscription(key, info);
+ addInactiveSubscription(key, info, sub);
} else {
if (sub.getConsumerInfo().isDurable()) {
view = new DurableSubscriptionView(this, context.getClientId(), sub);
@@ -217,21 +217,21 @@ public class ManagedRegionBroker extends
}
}
- public static String getSubscriptionObjectName(Subscription sub, String connectionClientId, ObjectName brokerJmxObjectName) {
+ public static String getSubscriptionObjectName(ConsumerInfo info, String connectionClientId, ObjectName brokerJmxObjectName) {
Hashtable map = brokerJmxObjectName.getKeyPropertyList();
String brokerDomain = brokerJmxObjectName.getDomain();
String objectNameStr = brokerDomain + ":" + "BrokerName=" + map.get("BrokerName") + ",Type=Subscription,";
- String destinationType = "destinationType=" + sub.getConsumerInfo().getDestination().getDestinationTypeAsString();
- String destinationName = "destinationName=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getDestination().getPhysicalName());
+ String destinationType = "destinationType=" + info.getDestination().getDestinationTypeAsString();
+ String destinationName = "destinationName=" + JMXSupport.encodeObjectNamePart(info.getDestination().getPhysicalName());
String clientId = "clientId=" + JMXSupport.encodeObjectNamePart(connectionClientId);
String persistentMode = "persistentMode=";
String consumerId = "";
- if (sub.getConsumerInfo().isDurable()) {
- persistentMode += "Durable,subscriptionID=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getSubscriptionName());
+ if (info.isDurable()) {
+ persistentMode += "Durable,subscriptionID=" + JMXSupport.encodeObjectNamePart(info.getSubscriptionName());
} else {
persistentMode += "Non-Durable";
- if (sub.getConsumerInfo() != null && sub.getConsumerInfo().getConsumerId() != null) {
- consumerId = ",consumerId=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getConsumerId().toString());
+ if (info.getConsumerId() != null) {
+ consumerId = ",consumerId=" + JMXSupport.encodeObjectNamePart(info.getConsumerId().toString());
}
}
objectNameStr += persistentMode + ",";
@@ -482,7 +482,7 @@ public class ManagedRegionBroker extends
info.setClientId(subscriptionKey.getClientId());
info.setSubscriptionName(subscriptionKey.getSubscriptionName());
info.setDestination(new ActiveMQTopic(view.getDestinationName()));
- addInactiveSubscription(subscriptionKey, info);
+ addInactiveSubscription(subscriptionKey, info, (brokerService.isKeepDurableSubsActive() ? view.subscription : null));
}
}
}
@@ -512,7 +512,7 @@ public class ManagedRegionBroker extends
Map.Entry entry = (Entry)i.next();
SubscriptionKey key = (SubscriptionKey)entry.getKey();
SubscriptionInfo info = (SubscriptionInfo)entry.getValue();
- addInactiveSubscription(key, info);
+ addInactiveSubscription(key, info, null);
}
}
@@ -525,12 +525,11 @@ public class ManagedRegionBroker extends
return known;
}
- protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info) {
- Hashtable<String, String> map = brokerObjectName.getKeyPropertyList();
+ protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info, Subscription subscription) {
try {
- ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + "," + "Type=Subscription," + "active=false,"
- + "name=" + JMXSupport.encodeObjectNamePart(key.toString()) + "");
- SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info);
+ ConsumerInfo offlineConsumerInfo = subscription != null ? subscription.getConsumerInfo() : ((TopicRegion)getTopicRegion()).createInactiveConsumerInfo(info);
+ ObjectName objectName = new ObjectName(getSubscriptionObjectName(offlineConsumerInfo, info.getClientId(), brokerObjectName));
+ SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info, subscription);
try {
AnnotatedMBean.registerMBean(managementContext, view, objectName);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1155437&r1=1155436&r2=1155437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Tue Aug 9 17:09:49 2011
@@ -47,7 +47,7 @@ public class DurableTopicSubscription ex
private static final Logger LOG = LoggerFactory.getLogger(DurableTopicSubscription.class);
private final ConcurrentHashMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>();
- private final ConcurrentHashMap<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
+ private final ConcurrentHashMap<ActiveMQDestination, Destination> durableDestinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
private final SubscriptionKey subscriptionKey;
private final boolean keepDurableSubsActive;
private AtomicBoolean active = new AtomicBoolean();
@@ -96,12 +96,14 @@ public class DurableTopicSubscription ex
}
public void add(ConnectionContext context, Destination destination) throws Exception {
- super.add(context, destination);
+ if (!destinations.contains(destination)) {
+ super.add(context, destination);
+ }
// do it just once per destination
- if (destinations.containsKey(destination.getActiveMQDestination())) {
- return;
+ if (durableDestinations.containsKey(destination.getActiveMQDestination())) {
+ return;
}
- destinations.put(destination.getActiveMQDestination(), destination);
+ durableDestinations.put(destination.getActiveMQDestination(), destination);
if (active.get() || keepDurableSubsActive) {
Topic topic = (Topic)destination;
@@ -130,7 +132,7 @@ public class DurableTopicSubscription ex
this.info = info;
LOG.debug("Activating " + this);
if (!keepDurableSubsActive) {
- for (Iterator<Destination> iter = destinations.values()
+ for (Iterator<Destination> iter = durableDestinations.values()
.iterator(); iter.hasNext();) {
Topic topic = (Topic) iter.next();
add(context, topic);
@@ -146,7 +148,7 @@ public class DurableTopicSubscription ex
// If nothing was in the persistent store, then try to use the
// recovery policy.
if (pending.isEmpty()) {
- for (Iterator<Destination> iter = destinations.values()
+ for (Iterator<Destination> iter = durableDestinations.values()
.iterator(); iter.hasNext();) {
Topic topic = (Topic) iter.next();
topic.recoverRetroactiveMessages(context, this);
@@ -168,10 +170,12 @@ public class DurableTopicSubscription ex
synchronized (pending) {
pending.stop();
}
- if (!keepDurableSubsActive) {
- for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
- Topic topic = (Topic)iter.next();
+ for (Iterator<Destination> iter = durableDestinations.values().iterator(); iter.hasNext();) {
+ Topic topic = (Topic)iter.next();
+ if (!keepDurableSubsActive) {
topic.deactivate(context, this);
+ } else {
+ topic.getDestinationStatistics().getInflight().subtract(dispatched.size());
}
}
@@ -270,7 +274,7 @@ public class DurableTopicSubscription ex
public synchronized String toString() {
- return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations=" + destinations.size() + ", total=" + enqueueCounter + ", pending="
+ return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations=" + durableDestinations.size() + ", total=" + enqueueCounter + ", pending="
+ getPendingQueueSize() + ", dispatched=" + dispatchCounter + ", inflight=" + dispatched.size() + ", prefetchExtension=" + this.prefetchExtension;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=1155437&r1=1155436&r2=1155437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Tue Aug 9 17:09:49 2011
@@ -259,7 +259,7 @@ public class TopicRegion extends Abstrac
return rc;
}
- private ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) {
+ public ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) {
ConsumerInfo rc = new ConsumerInfo();
rc.setSelector(info.getSelector());
rc.setSubscriptionName(info.getSubscriptionName());
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=1155437&r1=1155436&r2=1155437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Tue Aug 9 17:09:49 2011
@@ -261,16 +261,13 @@ public abstract class AbstractStoreCurso
this.batchResetNeeded = false;
}
if (this.batchList.isEmpty() && this.storeHasMessages && this.size >0) {
- this.storeHasMessages = false;
try {
doFillBatch();
} catch (Exception e) {
LOG.error(this + " - Failed to fill batch", e);
throw new RuntimeException(e);
}
- if (!this.batchList.isEmpty() || !hadSpace) {
- this.storeHasMessages=true;
- }
+ this.storeHasMessages = !this.batchList.isEmpty() || !hadSpace;
}
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1155437&r1=1155436&r2=1155437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java Tue Aug 9 17:09:49 2011
@@ -25,10 +25,13 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
+import javax.management.ObjectName;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
+import org.apache.activemq.broker.jmx.TopicViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic;
@@ -44,12 +47,15 @@ public class DurableSubscriptionOfflineT
private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOfflineTest.class);
public boolean usePrioritySupport = Boolean.TRUE;
public int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
+ public boolean keepDurableSubsActive = true;
private BrokerService broker;
private ActiveMQTopic topic;
private Vector<Exception> exceptions = new Vector<Exception>();
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
- return new ActiveMQConnectionFactory("vm://" + getName(true));
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + getName(true));
+ connectionFactory.setWatchTopicAdvisories(false);
+ return connectionFactory;
}
@Override
@@ -89,6 +95,8 @@ public class DurableSubscriptionOfflineT
broker.setBrokerName(getName(true));
broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
broker.getManagementContext().setCreateConnector(false);
+ broker.setAdvisorySupport(false);
+ broker.setKeepDurableSubsActive(keepDurableSubsActive);
if (usePrioritySupport) {
PolicyEntry policy = new PolicyEntry();
@@ -322,6 +330,119 @@ public class DurableSubscriptionOfflineT
assertEquals("offline consumer got all", sent, listener.count);
}
+ public void initCombosForTestJMXCountersWithOfflineSubs() throws Exception {
+ this.addCombinationValues("keepDurableSubsActive",
+ new Object[]{Boolean.TRUE, Boolean.FALSE});
+ }
+
+ public void testJMXCountersWithOfflineSubs() throws Exception {
+ // create durable subscription 1
+ Connection con = createConnection("cliId1");
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId", null, true);
+ session.close();
+ con.close();
+
+ // restart broker
+ broker.stop();
+ createBroker(false /*deleteAllMessages*/);
+
+ // send messages
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(null);
+
+ int sent = 0;
+ for (int i = 0; i < 10; i++) {
+ sent++;
+ Message message = session.createMessage();
+ producer.send(topic, message);
+ }
+ session.close();
+ con.close();
+
+ // consume some messages
+ con = createConnection("cliId1");
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+
+ for (int i=0; i<sent/2; i++) {
+ Message m = consumer.receive(4000);
+ assertNotNull("got message: " + i, m);
+ LOG.info("Got :" + i + ", " + m);
+ }
+
+ // check some counters while active
+ ObjectName activeDurableSubName = broker.getAdminView().getDurableTopicSubscribers()[0];
+ LOG.info("active durable sub name: " + activeDurableSubName);
+ final DurableSubscriptionViewMBean durableSubscriptionView = (DurableSubscriptionViewMBean)
+ broker.getManagementContext().newProxyInstance(activeDurableSubName, DurableSubscriptionViewMBean.class, true);
+
+ assertTrue("is active", durableSubscriptionView.isActive());
+ assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView.getEnqueueCounter());
+ assertTrue("correct waiting acks", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return 5 == durableSubscriptionView.getMessageCountAwaitingAcknowledge();
+ }
+ }));
+ assertEquals("correct dequeue", 5, durableSubscriptionView.getDequeueCounter());
+
+
+ ObjectName destinationName = broker.getAdminView().getTopics()[0];
+ TopicViewMBean topicView = (TopicViewMBean) broker.getManagementContext().newProxyInstance(destinationName, TopicViewMBean.class, true);
+ assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
+ assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
+ assertEquals("inflight", 5, topicView.getInFlightCount());
+
+ session.close();
+ con.close();
+
+ // check some counters when inactive
+ ObjectName inActiveDurableSubName = broker.getAdminView().getInactiveDurableTopicSubscribers()[0];
+ LOG.info("inactive durable sub name: " + inActiveDurableSubName);
+ DurableSubscriptionViewMBean durableSubscriptionView1 = (DurableSubscriptionViewMBean)
+ broker.getManagementContext().newProxyInstance(inActiveDurableSubName, DurableSubscriptionViewMBean.class, true);
+
+ assertTrue("is not active", !durableSubscriptionView1.isActive());
+ assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView1.getEnqueueCounter());
+ assertEquals("correct awaiting ack", 0, durableSubscriptionView1.getMessageCountAwaitingAcknowledge());
+ assertEquals("correct dequeue", keepDurableSubsActive ? 5 : 0, durableSubscriptionView1.getDequeueCounter());
+
+ // destination view
+ assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
+ assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
+ assertEquals("inflight back to 0 after deactivate", 0, topicView.getInFlightCount());
+
+ // consume the rest
+ con = createConnection("cliId1");
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+
+ for (int i=0; i<sent/2;i++) {
+ Message m = consumer.receive(30000);
+ assertNotNull("got message: " + i, m);
+ LOG.info("Got :" + i + ", " + m);
+ }
+
+ activeDurableSubName = broker.getAdminView().getDurableTopicSubscribers()[0];
+ LOG.info("durable sub name: " + activeDurableSubName);
+ final DurableSubscriptionViewMBean durableSubscriptionView2 = (DurableSubscriptionViewMBean)
+ broker.getManagementContext().newProxyInstance(activeDurableSubName, DurableSubscriptionViewMBean.class, true);
+
+ assertTrue("is active", durableSubscriptionView2.isActive());
+ assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView2.getEnqueueCounter());
+ assertTrue("correct dequeue", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ long val = durableSubscriptionView2.getDequeueCounter();
+ LOG.info("dequeue count:" + val);
+ return 10 == val;
+ }
+ }));
+
+ }
+
public void initCombosForTestOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
this.addCombinationValues("defaultPersistenceAdapter",
new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
@@ -1062,6 +1183,8 @@ public class DurableSubscriptionOfflineT
public void initCombosForTestCleanupDeletedSubAfterRestart() throws Exception {
this.addCombinationValues("journalMaxFileLength",
new Object[]{new Integer(64 * 1024)});
+ this.addCombinationValues("keepDurableSubsActive",
+ new Object[]{Boolean.TRUE, Boolean.FALSE});
}
// https://issues.apache.org/jira/browse/AMQ-3206