You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/04/21 15:41:25 UTC
svn commit: r650143 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/broker/region/virtual/
test/java/org/apache/activemq/advisory/ test/java/org/ap...
Author: rajdavies
Date: Mon Apr 21 06:41:19 2008
New Revision: 650143
URL: http://svn.apache.org/viewvc?rev=650143&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1672
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/CompositeDestinationInterceptor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationInterceptor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempDestLoadTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Mon Apr 21 06:41:19 2008
@@ -1235,6 +1235,10 @@
return getBroker().addDestination(getAdminConnectionContext(), destination);
}
+ public void removeDestination(ActiveMQDestination destination) throws Exception {
+ getBroker().removeDestination(getAdminConnectionContext(), destination,0);
+ }
+
public int getProducerSystemUsagePortion() {
return producerSystemUsagePortion;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Mon Apr 21 06:41:19 2008
@@ -183,6 +183,10 @@
}
destinationMap.removeAll(destination);
dispose(context,dest);
+ DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
+ if (destinationInterceptor != null) {
+ destinationInterceptor.remove(dest);
+ }
} else {
LOG.debug("Destination doesn't exist: " + dest);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java Mon Apr 21 06:41:19 2008
@@ -17,15 +17,12 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
-
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.management.ObjectName;
-
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
@@ -44,7 +41,6 @@
private static final Log LOG = LogFactory.getLog(AbstractSubscription.class);
protected Broker broker;
- protected Destination destination;
protected ConnectionContext context;
protected ConsumerInfo info;
protected final DestinationFilter destinationFilter;
@@ -53,9 +49,8 @@
private ObjectName objectName;
- public AbstractSubscription(Broker broker, Destination destination,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
+ public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
this.broker = broker;
- this.destination=destination;
this.context = context;
this.info = info;
this.destinationFilter = DestinationFilter.parseFilter(info.getDestination());
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/CompositeDestinationInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/CompositeDestinationInterceptor.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/CompositeDestinationInterceptor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/CompositeDestinationInterceptor.java Mon Apr 21 06:41:19 2008
@@ -35,5 +35,12 @@
}
return destination;
}
+
+
+ public void remove(Destination destination) {
+ for (int i = 0; i < interceptors.length; i++) {
+ interceptors[i].remove(destination);
+ }
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java Mon Apr 21 06:41:19 2008
@@ -19,7 +19,6 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
-
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
@@ -30,7 +29,6 @@
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.usage.MemoryUsage;
-import org.apache.activemq.usage.SystemUsage;
/**
*
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationInterceptor.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationInterceptor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationInterceptor.java Mon Apr 21 06:41:19 2008
@@ -25,5 +25,7 @@
public interface DestinationInterceptor {
Destination intercept(Destination destination);
+
+ void remove(Destination destination);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Mon Apr 21 06:41:19 2008
@@ -49,23 +49,14 @@
private final boolean keepDurableSubsActive;
private boolean active;
- public DurableTopicSubscription(Broker broker, Destination dest,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
+ public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
throws JMSException {
- super(broker,dest,usageManager, context, info);
+ super(broker,usageManager, context, info);
this.pending = new StoreDurableSubscriberCursor(broker,context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this);
this.pending.setSystemUsage(usageManager);
this.keepDurableSubsActive = keepDurableSubsActive;
subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
- if (dest != null && dest.getMessageStore() != null) {
- TopicMessageStore store = (TopicMessageStore)dest.getMessageStore();
- try {
- this.enqueueCounter=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName());
- } catch (IOException e) {
- JMSException jmsEx = new JMSException("Failed to retrieve eunqueueCount from store "+ e);
- jmsEx.setLinkedException(e);
- throw jmsEx;
- }
- }
+
}
public boolean isActive() {
@@ -82,6 +73,16 @@
public void add(ConnectionContext context, Destination destination) throws Exception {
super.add(context, destination);
destinations.put(destination.getActiveMQDestination(), destination);
+ if (destination.getMessageStore() != null) {
+ TopicMessageStore store = (TopicMessageStore)destination.getMessageStore();
+ try {
+ this.enqueueCounter+=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName());
+ } catch (IOException e) {
+ JMSException jmsEx = new JMSException("Failed to retrieve eunqueueCount from store "+ e);
+ jmsEx.setLinkedException(e);
+ throw jmsEx;
+ }
+ }
if (active || keepDurableSubsActive) {
Topic topic = (Topic)destination;
topic.activate(context, this);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Apr 21 06:41:19 2008
@@ -66,14 +66,14 @@
private final Object dispatchLock = new Object();
protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
- public PrefetchSubscription(Broker broker,Destination destination, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
- super(broker,destination, context, info);
+ public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
+ super(broker,context, info);
this.usageManager=usageManager;
pending = cursor;
}
- public PrefetchSubscription(Broker broker,Destination destination, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
- this(broker,destination,usageManager,context, info, new VMPendingMessageCursor());
+ public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
+ this(broker,usageManager,context, info, new VMPendingMessageCursor());
}
/**
@@ -168,9 +168,10 @@
+ mdn.getMessageId() + ") was not in the pending list");
}
- public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception {
+ public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception {
// Handle the standard acknowledgment case.
boolean callDispatchMatched = false;
+ Destination destination = null;
synchronized(dispatchLock) {
if (ack.isStandardAck()) {
// Acknowledge all dispatched messages up till the message id of
@@ -233,6 +234,7 @@
prefetchExtension = Math.max(0,
prefetchExtension - (index + 1));
}
+ destination = node.getRegionDestination();
callDispatchMatched = true;
break;
}
@@ -268,6 +270,7 @@
}
if (ack.getLastMessageId().equals(node.getMessageId())) {
prefetchExtension = Math.max(prefetchExtension, index + 1);
+ destination = node.getRegionDestination();
callDispatchMatched = true;
break;
}
@@ -294,6 +297,7 @@
if (inAckRange) {
node.incrementRedeliveryCounter();
if (ack.getLastMessageId().equals(messageId)) {
+ destination = node.getRegionDestination();
callDispatchMatched = true;
break;
}
@@ -335,6 +339,7 @@
if (ack.getLastMessageId().equals(messageId)) {
prefetchExtension = Math.max(0, prefetchExtension
- (index + 1));
+ destination = node.getRegionDestination();
callDispatchMatched = true;
break;
}
@@ -350,7 +355,7 @@
}
}
}
- if (callDispatchMatched) {
+ if (callDispatchMatched && destination != null) {
if (destination.isLazyDispatch()) {
destination.wakeup();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java Mon Apr 21 06:41:19 2008
@@ -17,9 +17,7 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
-
import javax.jms.InvalidSelectorException;
-
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ConsumerInfo;
@@ -33,9 +31,9 @@
boolean browseDone;
boolean destinationsAdded;
- public QueueBrowserSubscription(Broker broker,Destination destination, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info)
+ public QueueBrowserSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info)
throws InvalidSelectorException {
- super(broker,destination,usageManager, context, info);
+ super(broker,usageManager, context, info);
}
protected boolean canDispatch(MessageReference node) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java Mon Apr 21 06:41:19 2008
@@ -46,18 +46,11 @@
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info)
throws JMSException {
- Destination dest = null;
- try {
- dest = lookup(context, info.getDestination());
- } catch (Exception e) {
- JMSException jmsEx = new JMSException("Failed to retrieve destination from region "+ e);
- jmsEx.setLinkedException(e);
- throw jmsEx;
- }
+
if (info.isBrowser()) {
- return new QueueBrowserSubscription(broker,dest,usageManager, context, info);
+ return new QueueBrowserSubscription(broker,usageManager, context, info);
} else {
- return new QueueSubscription(broker, dest,usageManager,context, info);
+ return new QueueSubscription(broker, usageManager,context, info);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Mon Apr 21 06:41:19 2008
@@ -17,10 +17,8 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
-
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
-
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.group.MessageGroupMap;
@@ -28,7 +26,6 @@
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -37,8 +34,8 @@
private static final Log LOG = LogFactory.getLog(QueueSubscription.class);
- public QueueSubscription(Broker broker, Destination destination,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
- super(broker,destination,usageManager, context, info);
+ public QueueSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
+ super(broker,usageManager, context, info);
}
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java Mon Apr 21 06:41:19 2008
@@ -50,18 +50,10 @@
}
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
- Destination dest=null;
- try {
- dest = lookup(context, info.getDestination());
- } catch (Exception e) {
- JMSException jmsEx = new JMSException("Failed to retrieve destination from region "+ e);
- jmsEx.setLinkedException(e);
- throw jmsEx;
- }
if (info.isBrowser()) {
- return new QueueBrowserSubscription(broker,dest,usageManager,context, info);
+ return new QueueBrowserSubscription(broker,usageManager,context, info);
} else {
- return new QueueSubscription(broker,dest, usageManager,context, info);
+ return new QueueSubscription(broker,usageManager,context, info);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java Mon Apr 21 06:41:19 2008
@@ -47,9 +47,7 @@
throw new JMSException("A durable subscription cannot be created for a temporary topic.");
}
try {
-
- Destination dest = lookup(context, info.getDestination());
- TopicSubscription answer = new TopicSubscription(broker, dest,context, info, usageManager);
+ TopicSubscription answer = new TopicSubscription(broker, context, info, usageManager);
// lets configure the subscription depending on the destination
ActiveMQDestination destination = info.getDestination();
if (destination != null && broker.getDestinationPolicy() != null) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Mon Apr 21 06:41:19 2008
@@ -228,14 +228,7 @@
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
ActiveMQDestination destination = info.getDestination();
- Destination dest=null;
- try {
- dest = lookup(context, destination);
- } catch (Exception e) {
- JMSException jmsEx = new JMSException("Failed to retrieve destination from region "+ e);
- jmsEx.setLinkedException(e);
- throw jmsEx;
- }
+
if (info.isDurable()) {
if (AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
throw new JMSException("Cannot create a durable subscription for an advisory Topic");
@@ -245,7 +238,7 @@
if (sub == null) {
- sub = new DurableTopicSubscription(broker,dest, usageManager, context, info, keepDurableSubsActive);
+ sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive);
if (destination != null && broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
if (entry != null) {
@@ -259,7 +252,7 @@
return sub;
}
try {
- TopicSubscription answer = new TopicSubscription(broker, dest,context, info, usageManager);
+ TopicSubscription answer = new TopicSubscription(broker, context, info, usageManager);
// lets configure the subscription depending on the destination
if (destination != null && broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Mon Apr 21 06:41:19 2008
@@ -20,11 +20,8 @@
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-
import javax.jms.JMSException;
-
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
@@ -65,8 +62,8 @@
private final AtomicLong dequeueCounter = new AtomicLong(0);
private int memoryUsageHighWaterMark = 95;
- public TopicSubscription(Broker broker, Destination destination,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
- super(broker, destination,context, info);
+ public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
+ super(broker, context, info);
this.usageManager = usageManager;
String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]";
if (info.getDestination().isTemporary() || broker == null || broker.getTempDataStore()==null ) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java Mon Apr 21 06:41:19 2008
@@ -34,6 +34,10 @@
public Destination intercept(Destination destination) {
return new CompositeDestinationFilter(destination, getForwardTo(), isForwardOnly(), isCopyMessage());
}
+
+
+ public void remove(Destination destination) {
+ }
public String getName() {
return name;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java Mon Apr 21 06:41:19 2008
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.region.virtual;
+import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java Mon Apr 21 06:41:19 2008
@@ -67,6 +67,22 @@
}
return destination;
}
+
+
+ public void remove(Destination destination) {
+ if (brokerService == null) {
+ throw new IllegalArgumentException("No brokerService injected!");
+ }
+ ActiveMQDestination topic = getMirrorTopic(destination.getActiveMQDestination());
+ if (topic != null) {
+ try {
+ brokerService.removeDestination(topic);
+ } catch (Exception e) {
+ LOG.error("Failed to remove mirror destination for " + destination + ". Reason: " + e,e);
+ }
+ }
+
+ }
// Properties
// -------------------------------------------------------------------------
@@ -124,4 +140,5 @@
protected ActiveMQDestination getMirrorTopic(ActiveMQDestination original) {
return new ActiveMQTopic(prefix + original.getPhysicalName() + postfix);
}
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java Mon Apr 21 06:41:19 2008
@@ -40,7 +40,7 @@
private DestinationMap destinationMap = new DestinationMap();
private VirtualDestination[] virtualDestinations;
- public Destination intercept(Destination destination) {
+ public synchronized Destination intercept(Destination destination) {
Set virtualDestinations = destinationMap.get(destination.getActiveMQDestination());
List<Destination> destinations = new ArrayList<Destination>();
for (Iterator iter = virtualDestinations.iterator(); iter.hasNext();) {
@@ -57,6 +57,10 @@
}
}
return destination;
+ }
+
+
+ public synchronized void remove(Destination destination) {
}
public VirtualDestination[] getVirtualDestinations() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java Mon Apr 21 06:41:19 2008
@@ -46,6 +46,10 @@
return new VirtualTopicInterceptor(destination, getPrefix(), getPostfix());
}
+
+ public void remove(Destination destination) {
+ }
+
// Properties
// -------------------------------------------------------------------------
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempDestLoadTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempDestLoadTest.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempDestLoadTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempDestLoadTest.java Mon Apr 21 06:41:19 2008
@@ -25,6 +25,7 @@
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.command.ActiveMQDestination;
/**
* @version $Revision: 397249 $
@@ -56,9 +57,9 @@
RegionBroker rb = (RegionBroker) broker.getBroker().getAdaptor(
RegionBroker.class);
- //there should be 3 destinations - advisories -
- //1 for the connection + 2 generic ones
- assertTrue(rb.getDestinationMap().size()==3);
+ //there should be 2 destinations - advisories -
+ //1 for the connection + 1 generic ones
+ assertTrue(rb.getDestinationMap().size()==2);
}
public void testLoadTempAdvisoryTopics() throws Exception {
@@ -78,9 +79,9 @@
assertTrue(ab.getAdvisoryProducers().size() == 0);
RegionBroker rb = (RegionBroker) broker.getBroker().getAdaptor(
RegionBroker.class);
- //there should be 3 destinations - advisories -
- //1 for the connection + 2 generic ones
- assertTrue(rb.getDestinationMap().size()==3);
+ //there should be 2 destinations - advisories -
+ //1 for the connection + 1 generic ones
+ assertTrue(rb.getDestinationMap().size()==2);
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java Mon Apr 21 06:41:19 2008
@@ -86,7 +86,7 @@
//serverDestination +
- assertTrue(rb.getDestinationMap().size()==7);
+ assertTrue(rb.getDestinationMap().size()==6);
}
protected void setUp() throws Exception {
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueTest.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueTest.java Mon Apr 21 06:41:19 2008
@@ -21,9 +21,11 @@
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
-
+import javax.jms.TemporaryQueue;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.spring.ConsumerBean;
@@ -71,6 +73,20 @@
messageList.assertMessagesArrived(total);
LOG.info("Received: " + messageList);
+ }
+
+ public void testTempMirroredQueuesClearDown() throws Exception{
+ if (connection == null) {
+ connection = createConnection();
+ }
+ connection.start();
+ Session session = connection.createSession(false, 0);
+ TemporaryQueue tempQueue = session.createTemporaryQueue();
+ RegionBroker rb = (RegionBroker) broker.getBroker().getAdaptor(
+ RegionBroker.class);
+ assertTrue(rb.getDestinationMap().size()==4);
+ tempQueue.delete();
+ assertTrue(rb.getDestinationMap().size()==3);
}
protected Destination createConsumeDestination() {