You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2013/05/15 14:31:10 UTC
svn commit: r1482790 - in /activemq/trunk:
activemq-broker/src/main/java/org/apache/activemq/advisory/
activemq-broker/src/main/java/org/apache/activemq/broker/jmx/
activemq-broker/src/main/java/org/apache/activemq/broker/region/
activemq-unit-tests/sr...
Author: dejanb
Date: Wed May 15 12:31:09 2013
New Revision: 1482790
URL: http://svn.apache.org/r1482790
Log:
https://issues.apache.org/jira/browse/AMQ-4000 - as part of this feature we need to properly send advisories when durable sub unregisters
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=1482790&r1=1482789&r2=1482790&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Wed May 15 12:31:09 2013
@@ -25,29 +25,14 @@ import org.apache.activemq.broker.Broker
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.MessageReference;
-import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.broker.region.TopicSubscription;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.DestinationInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.broker.region.*;
+import org.apache.activemq.command.*;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator;
+import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -264,6 +249,29 @@ public class AdvisoryBroker extends Brok
}
@Override
+ public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
+ SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
+
+ DurableTopicSubscription sub = ((TopicRegion)((RegionBroker)next).getTopicRegion()).getDurableSubscription(key);
+
+ if (sub == null) {
+ LOG.warn("We cannot send an advisory message for a durable sub removal when we don't know about the durable sub");
+ return;
+ }
+
+ ActiveMQDestination dest = sub.getConsumerInfo().getDestination();
+
+ super.removeSubscription(context, info);
+
+ // Don't advise advisory topics.
+ if (!AdvisorySupport.isAdvisoryTopic(dest)) {
+ ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest);
+ fireConsumerAdvisory(context, dest, topic, info);
+ }
+
+ }
+
+ @Override
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
super.removeProducer(context, info);
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=1482790&r1=1482789&r2=1482790&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java Wed May 15 12:31:09 2013
@@ -405,7 +405,7 @@ public class BrokerView implements Broke
ConnectionContext context = new ConnectionContext();
context.setBroker(safeGetBroker());
context.setClientId(clientId);
- safeGetBroker().removeSubscription(context, info);
+ brokerService.getBroker().removeSubscription(context, info);
}
// doc comment inherited from BrokerViewMBean
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java?rev=1482790&r1=1482789&r2=1482790&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java Wed May 15 12:31:09 2013
@@ -20,6 +20,7 @@ import javax.management.openmbean.Compos
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
+import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Subscription;
@@ -31,6 +32,7 @@ import org.apache.activemq.command.Remov
public class DurableSubscriptionView extends SubscriptionView implements DurableSubscriptionViewMBean {
protected ManagedRegionBroker broker;
+ protected BrokerService brokerService;
protected String subscriptionName;
protected DurableTopicSubscription durableSub;
@@ -40,9 +42,10 @@ public class DurableSubscriptionView ext
* @param clientId
* @param sub
*/
- public DurableSubscriptionView(ManagedRegionBroker broker, String clientId, String userName, Subscription sub) {
+ public DurableSubscriptionView(ManagedRegionBroker broker, BrokerService brokerService, String clientId, String userName, Subscription sub) {
super(clientId, userName, sub);
this.broker = broker;
+ this.brokerService = brokerService;
this.durableSub=(DurableTopicSubscription) sub;
if (sub != null) {
this.subscriptionName = sub.getConsumerInfo().getSubscriptionName();
@@ -87,7 +90,7 @@ public class DurableSubscriptionView ext
ConnectionContext context = new ConnectionContext();
context.setBroker(broker);
context.setClientId(clientId);
- broker.removeSubscription(context, info);
+ brokerService.getBroker().removeSubscription(context, info);
}
public String toString() {
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java?rev=1482790&r1=1482789&r2=1482790&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java Wed May 15 12:31:09 2013
@@ -20,6 +20,7 @@ import javax.management.openmbean.Compos
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
+import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ConsumerInfo;
@@ -41,8 +42,8 @@ public class InactiveDurableSubscription
* @param userName
* @param subInfo
*/
- public InactiveDurableSubscriptionView(ManagedRegionBroker broker, String clientId, SubscriptionInfo subInfo, Subscription subscription) {
- super(broker, clientId, null, subscription);
+ public InactiveDurableSubscriptionView(ManagedRegionBroker broker, BrokerService brokerService, String clientId, SubscriptionInfo subInfo, Subscription subscription) {
+ super(broker, brokerService, clientId, null, subscription);
this.broker = broker;
this.subscriptionInfo = subInfo;
}
@@ -134,7 +135,7 @@ public class InactiveDurableSubscription
ConnectionContext context = new ConnectionContext();
context.setBroker(broker);
context.setClientId(clientId);
- broker.removeSubscription(context, info);
+ brokerService.getBroker().removeSubscription(context, info);
}
public String toString() {
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=1482790&r1=1482789&r2=1482790&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Wed May 15 12:31:09 2013
@@ -206,7 +206,7 @@ public class ManagedRegionBroker extends
} else {
String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null;
if (sub.getConsumerInfo().isDurable()) {
- view = new DurableSubscriptionView(this, context.getClientId(), userName, sub);
+ view = new DurableSubscriptionView(this, brokerService, context.getClientId(), userName, sub);
} else {
if (sub instanceof TopicSubscription) {
view = new TopicSubscriptionView(context.getClientId(), userName, (TopicSubscription) sub);
@@ -509,7 +509,7 @@ public class ManagedRegionBroker extends
try {
ConsumerInfo offlineConsumerInfo = subscription != null ? subscription.getConsumerInfo() : ((TopicRegion)getTopicRegion()).createInactiveConsumerInfo(info);
ObjectName objectName = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, info.getClientId(), offlineConsumerInfo);
- SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info, subscription);
+ SubscriptionView view = new InactiveDurableSubscriptionView(this, brokerService, key.getClientId(), info, subscription);
try {
AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1482790&r1=1482789&r2=1482790&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Wed May 15 12:31:09 2013
@@ -411,6 +411,7 @@ public class RegionBroker extends EmptyB
topicRegion.removeSubscription(context, info);
} finally {
inactiveDestinationsPurgeLock.readLock().unlock();
+
}
}
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=1482790&r1=1482789&r2=1482790&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Wed May 15 12:31:09 2013
@@ -378,4 +378,8 @@ public class TopicRegion extends Abstrac
public boolean durableSubscriptionExists(SubscriptionKey key) {
return this.durableSubscriptions.containsKey(key);
}
+
+ public DurableTopicSubscription getDurableSubscription(SubscriptionKey key) {
+ return durableSubscriptions.get(key);
+ }
}
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java?rev=1482790&r1=1482789&r2=1482790&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java Wed May 15 12:31:09 2013
@@ -18,15 +18,16 @@ package org.apache.activemq.usecases;
import java.io.File;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
-import javax.jms.Connection;
-import javax.jms.Session;
+import javax.jms.*;
import javax.management.InstanceNotFoundException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.TestSupport;
+import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
@@ -34,6 +35,7 @@ import org.apache.activemq.broker.jmx.Du
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
@@ -70,12 +72,14 @@ public class DurableSubscriptionUnsubscr
public void doJMXUnsubscribe(boolean restart) throws Exception {
createSubscriptions();
+ createAdvisorySubscription();
Thread.sleep(1000);
assertCount(100, 0);
if (restart) {
restartBroker();
+ createAdvisorySubscription();
assertCount(100, 0);
}
@@ -97,12 +101,14 @@ public class DurableSubscriptionUnsubscr
if (restart) {
restartBroker();
+ createAdvisorySubscription();
assertCount(0, 0);
}
}
public void doConnectionUnsubscribe(boolean restart) throws Exception {
createSubscriptions();
+ createAdvisorySubscription();
Thread.sleep(1000);
assertCount(100, 0);
@@ -131,6 +137,7 @@ public class DurableSubscriptionUnsubscr
if (restart) {
restartBroker();
+ createAdvisorySubscription();
assertCount(100, 0);
}
@@ -150,18 +157,21 @@ public class DurableSubscriptionUnsubscr
if (restart) {
restartBroker();
+ createAdvisorySubscription();
assertCount(0, 0);
}
}
public void doDirectUnsubscribe(boolean restart) throws Exception {
createSubscriptions();
+ createAdvisorySubscription();
Thread.sleep(1000);
assertCount(100, 0);
if (restart) {
restartBroker();
+ createAdvisorySubscription();
assertCount(100, 0);
}
@@ -172,9 +182,10 @@ public class DurableSubscriptionUnsubscr
ConnectionContext context = new ConnectionContext();
context.setBroker(broker.getRegionBroker());
context.setClientId(getName());
- broker.getRegionBroker().removeSubscription(context, info);
+ broker.getBroker().removeSubscription(context, info);
if (i % 20 == 0) {
+ Thread.sleep(1000);
assertCount(100 - i - 1, 0);
}
}
@@ -183,6 +194,7 @@ public class DurableSubscriptionUnsubscr
if (restart) {
restartBroker();
+ createAdvisorySubscription();
assertCount(0, 0);
}
}
@@ -195,6 +207,20 @@ public class DurableSubscriptionUnsubscr
}
}
+ private final AtomicInteger advisories = new AtomicInteger(0);
+
+ private void createAdvisorySubscription() throws Exception {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer advisoryConsumer = session.createConsumer(AdvisorySupport.getConsumerAdvisoryTopic(topic));
+ advisoryConsumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ if (((ActiveMQMessage)message).getDataStructure() instanceof RemoveSubscriptionInfo) {
+ advisories.incrementAndGet();
+ }
+ }
+ });
+ }
private void assertCount(int all, int active) throws Exception {
int inactive = all - active;
@@ -224,6 +250,9 @@ public class DurableSubscriptionUnsubscr
// check the strange false MBean
if (all == 0)
assertEquals(0, countMBean());
+
+ // check if we got all advisories
+ assertEquals(100, all + advisories.get());
}
private int countMBean() throws MalformedObjectNameException, InstanceNotFoundException {