You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/04/10 10:12:14 UTC
svn commit: r392904 - in /incubator/activemq/trunk/activemq-core: ./
src/main/java/org/apache/activemq/advisory/
src/main/java/org/apache/activemq/broker/region/
Author: rajdavies
Date: Mon Apr 10 01:12:09 2006
New Revision: 392904
URL: http://svn.apache.org/viewcvs?rev=392904&view=rev
Log:
fixes for memory leaks
Modified:
incubator/activemq/trunk/activemq-core/project.xml
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
Modified: incubator/activemq/trunk/activemq-core/project.xml
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/project.xml?rev=392904&r1=392903&r2=392904&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/project.xml (original)
+++ incubator/activemq/trunk/activemq-core/project.xml Mon Apr 10 01:12:09 2006
@@ -348,8 +348,9 @@
<!-- This test currently fails -->
<exclude>**/ItStillMarshallsTheSameTest.*</exclude>
- <!-- This test currently fails -->
+ <!-- Kaha in flux - removing tests -->
<exclude>**/KahaXARecoveryBrokerTest.*</exclude>
+ <exclude>**/KahaRecoveryBrokerTest.*</exclude>
<!-- https://issues.apache.org/activemq/browse/AMQ-522 -->
<exclude>**/ProxyConnectorTest.*</exclude>
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=392904&r1=392903&r2=392904&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Mon Apr 10 01:12:09 2006
@@ -62,6 +62,8 @@
super(next);
advisoryProducerId.setConnectionId(idGenerator.generateId());
}
+
+
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
next.addConnection(context, info);
@@ -149,11 +151,14 @@
next.removeDestination(context, destination, timeout);
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
DestinationInfo info = (DestinationInfo) destinations.remove(destination);
- if( info !=null ) {
+ if( info !=null && info.getDestination() != null && topic != null) {
info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
fireAdvisory(context, topic, info);
+ next.removeDestination(context,topic,timeout);
+ next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), timeout);
+ next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), timeout);
}
- next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), timeout);
+
}
public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=392904&r1=392903&r2=392904&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Mon Apr 10 01:12:09 2006
@@ -90,26 +90,25 @@
}
}
- public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
- throws Exception {
-
- // The destination cannot be removed if there are any active subscriptions
- for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) {
- Subscription sub = (Subscription) iter.next();
- if( sub.matches(destination) ) {
- throw new JMSException("Destination still has an active subscription: "+ destination);
+ public void removeDestination(ConnectionContext context,ActiveMQDestination destination,long timeout)
+ throws Exception{
+ // The destination cannot be removed if there are any active subscriptions
+ for(Iterator iter=subscriptions.values().iterator();iter.hasNext();){
+ Subscription sub=(Subscription) iter.next();
+ if(sub.matches(destination)){
+ throw new JMSException("Destination still has an active subscription: "+destination);
}
}
-
log.debug("Removing destination: "+destination);
synchronized(destinationsMutex){
Destination dest=(Destination) destinations.remove(destination);
- if(dest==null)
- throw new IllegalArgumentException("The destination does not exist: "+destination);
-
- destinationMap.removeAll(destination);
- dest.dispose(context);
- dest.stop();
+ if(dest!=null){
+ destinationMap.removeAll(destination);
+ dest.dispose(context);
+ dest.stop();
+ }else{
+ log.debug("Destination doesn't exist: " + dest);
+ }
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=392904&r1=392903&r2=392904&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Mon Apr 10 01:12:09 2006
@@ -58,7 +58,7 @@
protected final ActiveMQDestination destination;
protected final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList();
protected final Valve dispatchValve = new Valve(true);
- protected final TopicMessageStore store;
+ protected final TopicMessageStore store;//this could be NULL! (If an advsiory)
protected final UsageManager usageManager;
protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
@@ -72,7 +72,7 @@
TaskRunnerFactory taskFactory) {
this.destination = destination;
- this.store = store;
+ this.store = store; //this could be NULL! (If an advsiory)
this.usageManager = new UsageManager(memoryManager);
this.usageManager.setLimit(Long.MAX_VALUE);
@@ -287,7 +287,7 @@
}
public Message loadMessage(MessageId messageId) throws IOException {
- return store.getMessage(messageId);
+ return store != null ? store.getMessage(messageId) : null;
}
public void start() throws Exception {
@@ -301,19 +301,21 @@
public Message[] browse(){
final Set result=new CopyOnWriteArraySet();
try{
- store.recover(new MessageRecoveryListener(){
- public void recoverMessage(Message message) throws Exception{
- result.add(message);
- }
+ if(store!=null){
+ store.recover(new MessageRecoveryListener(){
+ public void recoverMessage(Message message) throws Exception{
+ result.add(message);
+ }
- public void recoverMessageReference(String messageReference) throws Exception{}
+ public void recoverMessageReference(String messageReference) throws Exception{}
- public void finished(){}
- });
- Message[] msgs=subscriptionRecoveryPolicy.browse(getActiveMQDestination());
- if(msgs!=null){
- for(int i=0;i<msgs.length;i++){
- result.add(msgs[i]);
+ public void finished(){}
+ });
+ Message[] msgs=subscriptionRecoveryPolicy.browse(getActiveMQDestination());
+ if(msgs!=null){
+ for(int i=0;i<msgs.length;i++){
+ result.add(msgs[i]);
+ }
}
}
}catch(Throwable e){
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=392904&r1=392903&r2=392904&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Mon Apr 10 01:12:09 2006
@@ -22,6 +22,7 @@
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
+import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination;
@@ -154,7 +155,11 @@
// Implementation methods
// -------------------------------------------------------------------------
protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
- TopicMessageStore store = persistenceAdapter.createTopicMessageStore((ActiveMQTopic) destination);
+ TopicMessageStore store = null;
+ if (!AdvisorySupport.isAdvisoryTopic(destination)){
+ store = persistenceAdapter.createTopicMessageStore((ActiveMQTopic) destination);
+ }
+
Topic topic = new Topic(destination, store, memoryManager, destinationStatistics, taskRunnerFactory);
configureTopic(topic, destination);