You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/09/02 09:03:31 UTC

svn commit: r439552 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/kaha/impl/ test/...

Author: rajdavies
Date: Sat Sep  2 00:03:30 2006
New Revision: 439552

URL: http://svn.apache.org/viewvc?rev=439552&view=rev
Log:
some ground work for http://issues.apache.org/activemq/browse/AMQ-845
changed pending linked list to use a PendingMessageCursor interface instead

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java   (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java   (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java   (with props)
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java   (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/CachedListContainerImplTest.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?rev=439552&r1=439551&r2=439552&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java Sat Sep  2 00:03:30 2006
@@ -33,6 +33,7 @@
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.kaha.Store;
 
 /**
  * The Message Broker which routes messages,
@@ -251,4 +252,12 @@
      * Sets the default administration connection context used when configuring the broker on startup or via JMX
      */
     public abstract void setAdminConnectionContext(ConnectionContext adminConnectionContext);
+    
+    
+    /**
+     * @return the broker's temp data store
+     * @throws Exception
+     */
+    
+    public Store getTempDataStore();
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?rev=439552&r1=439551&r2=439552&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java Sat Sep  2 00:03:30 2006
@@ -35,6 +35,7 @@
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.kaha.Store;
 
 import java.util.Map;
 import java.util.Set;
@@ -231,5 +232,10 @@
     public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
         next.setAdminConnectionContext(adminConnectionContext);
     }
+
+    public Store getTempDataStore() {
+        return next.getTempDataStore();
+    }
+    
     
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=439552&r1=439551&r2=439552&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Sat Sep  2 00:03:30 2006
@@ -17,9 +17,19 @@
  */
 package org.apache.activemq.broker;
 
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
-
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
 import org.apache.activemq.ActiveMQConnectionMetaData;
 import org.apache.activemq.Service;
 import org.apache.activemq.advisory.AdvisoryBroker;
@@ -34,15 +44,19 @@
 import org.apache.activemq.broker.jmx.NetworkConnectorView;
 import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
 import org.apache.activemq.broker.jmx.ProxyConnectorView;
+import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
 import org.apache.activemq.broker.region.DestinationFactory;
 import org.apache.activemq.broker.region.DestinationFactoryImpl;
-import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
 import org.apache.activemq.broker.region.DestinationInterceptor;
 import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.virtual.*;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.VirtualTopic;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.kaha.Store;
+import org.apache.activemq.kaha.StoreFactory;
 import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.network.ConnectionFilter;
 import org.apache.activemq.network.DiscoveryNetworkConnector;
@@ -64,24 +78,8 @@
 import org.apache.activemq.util.URISupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
-import javax.management.InstanceNotFoundException;
-import javax.management.JMException;
-import javax.management.MBeanRegistrationException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Manages the lifecycle of an ActiveMQ Broker. A BrokerService consists of a number of transport
@@ -105,6 +103,7 @@
     private boolean shutdownOnMasterFailure = false;
     private String brokerName = "localhost";
     private File dataDirectory;
+    private File tmpDataDirectory;
     private Broker broker;
     private BrokerView adminView;
     private ManagementContext managementContext;
@@ -139,6 +138,7 @@
     private BrokerId brokerId;
     private DestinationInterceptor[] destinationInterceptors;
     private ActiveMQDestination[] destinations;
+    private Store tempDataStore;
 
     /**
      * Adds a new transport connector for the given bind address
@@ -530,6 +530,24 @@
     public void setDataDirectory(File dataDirectory) {
         this.dataDirectory = dataDirectory;
     }
+    
+    /**
+     * @return the tmpDataDirectory
+     */
+    public File getTmpDataDirectory(){
+        if (tmpDataDirectory == null) {
+            tmpDataDirectory = new File(getDataDirectory(), "tmp_storage");
+        }
+        return tmpDataDirectory;
+    }
+
+    /**
+     * @param tmpDataDirectory the tmpDataDirectory to set
+     */
+    public void setTmpDataDirectory(File tmpDataDirectory){
+        this.tmpDataDirectory=tmpDataDirectory;
+    }
+    
 
     public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) {
         this.persistenceFactory = persistenceFactory;
@@ -906,6 +924,29 @@
     public void setDestinations(ActiveMQDestination[] destinations) {
         this.destinations = destinations;
     }
+    
+    /**
+     * @return the tempDataStore
+     */
+    public Store getTempDataStore() {
+        if (tempDataStore == null){
+            String name = getTmpDataDirectory().getPath();
+            try {
+            StoreFactory.delete(name);
+            tempDataStore = StoreFactory.open(name,"rw");
+            }catch(IOException e){
+                throw new RuntimeException(e);
+            }
+        }
+        return tempDataStore;
+    }
+
+    /**
+     * @param tempDataStore the tempDataStore to set
+     */
+    public void setTempDataStore(Store tempDataStore){
+        this.tempDataStore=tempDataStore;
+    }   
 
     // Implementation methods
     // -------------------------------------------------------------------------
@@ -1386,5 +1427,6 @@
             masterConnector = (MasterConnector) service;
         }
     }
-   
+
+    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?rev=439552&r1=439551&r2=439552&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java Sat Sep  2 00:03:30 2006
@@ -35,6 +35,7 @@
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.kaha.Store;
 
 import java.util.Collections;
 import java.util.Map;
@@ -227,6 +228,10 @@
 
     
     public Response messagePull(ConnectionContext context, MessagePull pull) {
+        return null;
+    }
+    
+    public Store getTempDataStore() {
         return null;
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?rev=439552&r1=439551&r2=439552&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java Sat Sep  2 00:03:30 2006
@@ -39,6 +39,7 @@
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.kaha.Store;
 
 /**
  * Implementation of the broker where all it's methods throw an
@@ -227,6 +228,10 @@
     }
 
     public Response messagePull(ConnectionContext context, MessagePull pull) {
+        throw new BrokerStoppedException(this.message);
+    }
+    
+    public Store getTempDataStore() {
         throw new BrokerStoppedException(this.message);
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?rev=439552&r1=439551&r2=439552&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java Sat Sep  2 00:03:30 2006
@@ -35,6 +35,7 @@
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.kaha.Store;
 
 import java.util.Map;
 import java.util.Set;
@@ -242,6 +243,10 @@
 
     public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
         return getNext().messagePull(context, pull);
+    }
+    
+    public Store getTempDataStore() {
+        return getNext().getTempDataStore();
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=439552&r1=439551&r2=439552&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Sat Sep  2 00:03:30 2006
@@ -19,17 +19,16 @@
 
 import java.io.IOException;
 import java.util.Iterator;
-
 import javax.jms.InvalidSelectorException;
-
+import javax.jms.JMSException;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.util.SubscriptionKey;
-
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 
 public class DurableTopicSubscription extends PrefetchSubscription {
@@ -41,7 +40,8 @@
     private boolean active=false;
     
     public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) throws InvalidSelectorException {
-        super(broker,context, info);
+        //super(broker,context, info, new FilePendingMessageCursor(context.getClientId() + info.getConsumerId().toString(),broker.getTempDataStore()));
+        super(broker,context,info);
         this.keepDurableSubsActive = keepDurableSubsActive;
         subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
     }
@@ -102,7 +102,7 @@
             }
             if( keepDurableSubsActive ) {
             	synchronized(pending) {
-            		pending.addFirst(node);
+            		pending.addMessageFirst(node);
             	}
             } else {
                 node.decrementReferenceCount();
@@ -112,10 +112,11 @@
         
         if( !keepDurableSubsActive ) {
         	synchronized(pending) {
-	            for (Iterator iter = pending.iterator(); iter.hasNext();) {
-	                MessageReference node = (MessageReference) iter.next();
+                pending.reset();
+	            while(pending.hasNext()) {
+	                MessageReference node = pending.next();
 	                node.decrementReferenceCount();
-	                iter.remove();
+	                pending.remove();
 	            }
         	}
         }
@@ -189,8 +190,9 @@
     synchronized public void destroy() {
     	
     	synchronized(pending) {
-	        for (Iterator iter = pending.iterator(); iter.hasNext();) {
-	            MessageReference node = (MessageReference) iter.next();
+            pending.reset();
+	        while(pending.hasNext()) {
+	            MessageReference node = pending.next();
 	            node.decrementReferenceCount();
 	        }
 	        pending.clear();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=439552&r1=439551&r2=439552&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Sat Sep  2 00:03:30 2006
@@ -20,12 +20,14 @@
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.LinkedList;
-
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
-
+import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerControl;
@@ -50,7 +52,7 @@
 abstract public class PrefetchSubscription extends AbstractSubscription{
     
     static private final Log log=LogFactory.getLog(PrefetchSubscription.class);
-    final protected LinkedList pending=new LinkedList();
+    final protected PendingMessageCursor pending;
     final protected LinkedList dispatched=new LinkedList();
     
     protected int prefetchExtension=0;
@@ -59,10 +61,16 @@
     long enqueueCounter;
     long dispatchCounter;
     long dequeueCounter;
-        
-    public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info)
-                    throws InvalidSelectorException{
+    
+    public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info, PendingMessageCursor cursor)
+                    throws  InvalidSelectorException{
         super(broker,context,info);
+        pending = cursor;
+    }
+    
+    public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info)
+    throws  InvalidSelectorException{
+       this(broker,context,info,new VMPendingMessageCursor()); 
     }
 
     
@@ -77,8 +85,8 @@
             prefetchExtension++;
             
             final long dispatchCounterBeforePull = dispatchCounter;
-        	dispatchMatched();
-        	
+            dispatchMatched();
+            
         	// If there was nothing dispatched.. we may need to setup a timeout.
         	if( dispatchCounterBeforePull == dispatchCounter ) {
         		// imediate timeout used by receiveNoWait()
@@ -86,7 +94,7 @@
         			// Send a NULL message.
 	            	add(QueueMessageReference.NULL_MESSAGE);
 	            	dispatchMatched();
-        		}
+        }
         		if( pull.getTimeout() > 0 ) {
 	            	Scheduler.executeAfterDelay(new Runnable(){
 							public void run() {
@@ -124,17 +132,18 @@
                 if( pending.isEmpty() ) {
                     log.debug("Prefetch limit.");
                 }
-                pending.addLast(node);
+                pending.addMessageLast(node);
             }
         }
     }
 
     synchronized public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
         synchronized(pending){
-            for(Iterator i=pending.iterator();i.hasNext();){
-                MessageReference node=(MessageReference) i.next();
+            pending.reset();
+            while(pending.hasNext()){
+                MessageReference node=pending.next();
                 if(node.getMessageId().equals(mdn.getMessageId())){
-                    i.remove();
+                    pending.remove();
                     createMessageDispatch(node,node.getMessage());
                     dispatched.addLast(node);
                     return;
@@ -329,9 +338,10 @@
         if(!dispatching){
             dispatching=true;
             try{
-                for(Iterator iter=pending.iterator();iter.hasNext()&&!isFull();){
-                    MessageReference node=(MessageReference) iter.next();
-                    iter.remove();
+                pending.reset();
+                while(pending.hasNext()&&!isFull()){
+                    MessageReference node=pending.next();
+                    pending.remove();
                     dispatch(node);
                 }
             }finally{
@@ -352,8 +362,8 @@
 
             // NULL messages don't count... they don't get Acked.
             if( node != QueueMessageReference.NULL_MESSAGE ) {
-        		dispatchCounter++;
-        		dispatched.addLast(node);            
+            dispatchCounter++;
+            dispatched.addLast(node);            
             } else {
             	prefetchExtension=Math.max(0,prefetchExtension-1);
             }
@@ -380,8 +390,8 @@
     synchronized protected void onDispatch(final MessageReference node,final Message message){
         if(node.getRegionDestination()!=null){
         	if( node != QueueMessageReference.NULL_MESSAGE ) {
-	            node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message);
-	            context.getConnection().getStatistics().onMessageDequeue(message);
+            node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message);
+            context.getConnection().getStatistics().onMessageDequeue(message);
         	}
             try{
                 dispatchMatched();
@@ -412,19 +422,19 @@
      */
     protected MessageDispatch createMessageDispatch(MessageReference node,Message message){
         if( node == QueueMessageReference.NULL_MESSAGE ) {
-            MessageDispatch md = new MessageDispatch();
+        MessageDispatch md=new MessageDispatch();
             md.setMessage(null);
-            md.setConsumerId( info.getConsumerId() );
+        md.setConsumerId(info.getConsumerId());
             md.setDestination( null );
             return md;
         } else {
             MessageDispatch md=new MessageDispatch();
             md.setConsumerId(info.getConsumerId());
-            md.setDestination(node.getRegionDestination().getActiveMQDestination());
-            md.setMessage(message);
-            md.setRedeliveryCounter(node.getRedeliveryCounter());
-            return md;
-        }
+        md.setDestination(node.getRegionDestination().getActiveMQDestination());
+        md.setMessage(message);
+        md.setRedeliveryCounter(node.getRedeliveryCounter());
+        return md;
+    }
     }
 
     /**

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java?rev=439552&r1=439551&r2=439552&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java Sat Sep  2 00:03:30 2006
@@ -18,13 +18,13 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
-
 import javax.jms.InvalidSelectorException;
-
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.filter.MessageEvaluationContext;
 
 public class QueueBrowserSubscription extends QueueSubscription {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=439552&r1=439551&r2=439552&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Sat Sep  2 00:03:30 2006
@@ -17,6 +17,9 @@
  */
 package org.apache.activemq.broker.region;
 
+import java.io.IOException;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.group.MessageGroupMap;
@@ -28,11 +31,6 @@
 import org.apache.activemq.transaction.Synchronization;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-
-import java.io.IOException;
 
 public class QueueSubscription extends PrefetchSubscription implements LockOwner {
     

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=439552&r1=439551&r2=439552&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Sat Sep  2 00:03:30 2006
@@ -42,6 +42,7 @@
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.kaha.Store;
 import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
@@ -572,5 +573,9 @@
  
     public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
         this.adminConnectionContext = adminConnectionContext;
+    }
+    
+    public Store getTempDataStore() {
+        return brokerService.getTempDataStore();
     }
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=439552&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Sat Sep  2 00:03:30 2006
@@ -0,0 +1,132 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+import java.io.IOException;
+import java.util.*;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.kaha.*;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.store.kahadaptor.CommandMarshaller;
+/**
+ * perist pending messages pending message (messages awaiting disptach to a consumer) cursor
+ * 
+ * @version $Revision$
+ */
+public class FilePendingMessageCursor implements PendingMessageCursor{
+    private ListContainer list;
+    private Iterator iter = null;
+    private Destination regionDestination;
+    
+    /**
+     * @param name
+     * @param store
+     * @throws IOException
+     */
+    public FilePendingMessageCursor(String name, Store store) {
+        try{
+            list = store.getListContainer(name);
+            list.setMarshaller(new CommandMarshaller(new OpenWireFormat()));
+            list.setMaximumCacheSize(0);
+        }catch(IOException e){
+            throw new RuntimeException(e);
+        }
+    }
+    /**
+     * @return true if there are no pending messages
+     */
+    public boolean isEmpty(){
+        return list.isEmpty();
+    }
+
+    /**
+     * reset the cursor
+     *
+     */
+    public void reset(){
+        iter = list.listIterator();
+    }
+    
+    /**
+     * add message to await dispatch
+     * 
+     * @param node
+     */
+    public void addMessageLast(MessageReference node){
+        try{
+            regionDestination = node.getMessage().getRegionDestination();
+            node.decrementReferenceCount();
+        }catch(IOException e){
+           throw new RuntimeException(e);
+        }
+        list.addLast(node);
+    }
+    
+    /**
+     * add message to await dispatch
+     * @param position 
+     * @param node
+     */
+    public void addMessageFirst(MessageReference node){
+        try{
+            regionDestination = node.getMessage().getRegionDestination();
+            node.decrementReferenceCount();
+        }catch(IOException e){
+           throw new RuntimeException(e);
+        }
+        list.addFirst(node);
+    }
+
+
+    /**
+     * @return true if there pending messages to dispatch
+     */
+    public boolean hasNext(){
+       return iter.hasNext();
+    }
+
+    /**
+     * @return the next pending message
+     */
+    public MessageReference next(){
+        Message message = (Message) iter.next();
+        message.setRegionDestination(regionDestination);
+        message.incrementReferenceCount();
+        return message;
+    }
+
+    /**
+     * remove the message at the cursor position
+     * 
+     */
+    public void remove(){
+        iter.remove();
+    }
+
+    /**
+     * @return the number of pending messages
+     */
+    public int size(){
+        return list.size();
+    }
+
+    /**
+     * clear all pending messages
+     * 
+     */
+    public void clear(){
+        list.clear();
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
------------------------------------------------------------------------------
    svn:executable = *

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?rev=439552&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java Sat Sep  2 00:03:30 2006
@@ -0,0 +1,73 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import org.apache.activemq.broker.region.MessageReference;
+
+/**
+ * Interface to pending message (messages awaiting disptach to a consumer) cursor
+ * 
+ * @version $Revision$
+ */
+public interface PendingMessageCursor{
+    /**
+     * @return true if there are no pending messages
+     */
+    public boolean isEmpty();
+    
+    /**
+     * reset the cursor
+     *
+     */
+    public void reset();
+
+    /**
+     * add message to await dispatch
+     * @param node
+     */
+    public void addMessageLast(MessageReference node);
+    
+    /**
+     * add message to await dispatch
+     * @param node
+     */
+    public void addMessageFirst(MessageReference node);
+
+    /**
+     * @return true if there pending messages to dispatch
+     */
+    public boolean hasNext();
+
+    /**
+     * @return the next pending message
+     */
+    public MessageReference next();
+
+    /**
+     * remove the message at the cursor position
+     * 
+     */
+    public void remove();
+
+    /**
+     * @return the number of pending messages
+     */
+    public int size();
+
+    /**
+     * clear all pending messages
+     * 
+     */
+    public void clear();
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
------------------------------------------------------------------------------
    svn:executable = *

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java?rev=439552&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java Sat Sep  2 00:03:30 2006
@@ -0,0 +1,96 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+import java.util.Iterator;
+import java.util.LinkedList;
+import org.apache.activemq.broker.region.MessageReference;
+/**
+ * hold pending messages in a linked list (messages awaiting disptach to a consumer) cursor
+ * 
+ * @version $Revision$
+ */
+public class VMPendingMessageCursor implements PendingMessageCursor{
+    private LinkedList list = new LinkedList();
+    private Iterator iter = null;
+    /**
+     * @return true if there are no pending messages
+     */
+    public boolean isEmpty(){
+        return list.isEmpty();
+    }
+
+    /**
+     * reset the cursor
+     *
+     */
+    public void reset(){
+        iter = list.listIterator();
+    }
+    
+    /**
+     * add message to await dispatch
+     * 
+     * @param node
+     */
+    public void addMessageLast(MessageReference node){
+        list.addLast(node);
+    }
+    
+    /**
+     * add message to await dispatch
+     * @param position 
+     * @param node
+     */
+    public void addMessageFirst(MessageReference node){
+        list.addFirst(node);
+    }
+
+
+    /**
+     * @return true if there pending messages to dispatch
+     */
+    public boolean hasNext(){
+       return iter.hasNext();
+    }
+
+    /**
+     * @return the next pending message
+     */
+    public MessageReference next(){
+        return (MessageReference) iter.next();
+    }
+
+    /**
+     * remove the message at the cursor position
+     * 
+     */
+    public void remove(){
+        iter.remove();
+    }
+
+    /**
+     * @return the number of pending messages
+     */
+    public int size(){
+        return list.size();
+    }
+
+    /**
+     * clear all pending messages
+     * 
+     */
+    public void clear(){
+        list.clear();
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
------------------------------------------------------------------------------
    svn:executable = *

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java?rev=439552&r1=439551&r2=439552&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java Sat Sep  2 00:03:30 2006
@@ -59,7 +59,8 @@
     public KahaStore(String name,String mode) throws IOException{
         this.name=name;
         this.mode=mode;
-        initialize();
+        directory=new File(name);
+        directory.mkdirs();
     }
 
     public synchronized void close() throws IOException{
@@ -113,22 +114,22 @@
     }
 
     public synchronized boolean delete() throws IOException{
-        initialize();
-        clear();
         boolean result=true;
-        for(Iterator iter=indexManagers.values().iterator();iter.hasNext();){
-            IndexManager im=(IndexManager) iter.next();
-            result&=im.delete();
-            iter.remove();
-        }
-        for(Iterator iter=dataManagers.values().iterator();iter.hasNext();){
-            DataManager dm=(DataManager) iter.next();
-            result&=dm.delete();
-            iter.remove();
-        }
-        // now delete all the files - containers that don't use the standard DataManager
-        // and IndexManager will not have initialized the files - so these will be left around
-        // unless we do this
+        if (initialized){
+            clear();
+            
+            for(Iterator iter=indexManagers.values().iterator();iter.hasNext();){
+                IndexManager im=(IndexManager) iter.next();
+                result&=im.delete();
+                iter.remove();
+            }
+            for(Iterator iter=dataManagers.values().iterator();iter.hasNext();){
+                DataManager dm=(DataManager) iter.next();
+                result&=dm.delete();
+                iter.remove();
+            }
+        }
+       
         if(directory!=null&&directory.isDirectory()){
             File[] files=directory.listFiles();
             if(files!=null){
@@ -248,10 +249,8 @@
     		throw new IOException("Store has been closed.");
         if(!initialized){
             initialized=true;
-            directory=new File(name);
-            directory.mkdirs();
-            log.info("Kaha Store using data directory " + directory);
             
+            log.info("Kaha Store using data directory " + directory);
             DataManager defaultDM = getDataManager(DEFAULT_CONTAINER_NAME);
             rootIndexManager = getIndexManager(defaultDM, DEFAULT_CONTAINER_NAME);
             

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/CachedListContainerImplTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/CachedListContainerImplTest.java?rev=439552&r1=439551&r2=439552&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/CachedListContainerImplTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/CachedListContainerImplTest.java Sat Sep  2 00:03:30 2006
@@ -31,7 +31,9 @@
     protected int MAX_CACHE_SIZE=10;
 
     protected KahaStore getStore() throws IOException{
-        return new KahaStore(name,"rw");
+        KahaStore store = new KahaStore(name,"rw");
+        store.initialize();
+        return store;
     }
 
     public void testAdds() throws Exception{

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java?rev=439552&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java Sat Sep  2 00:03:30 2006
@@ -0,0 +1,133 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.perf;
+
+import java.io.File;
+import java.net.URI;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+import junit.framework.AssertionFailedError;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
+/**
+ * @version $Revision$
+ */
+public class InactiveDurableTopicTest extends TestCase{
+    private static final String DEFAULT_PASSWORD="";
+    private static final String USERNAME="testuser";
+    private static final String CLIENTID="mytestclient";
+    private static final String TOPIC_NAME="testevent";
+    private static final String SUBID="subscription1";
+    private static final int deliveryMode=javax.jms.DeliveryMode.PERSISTENT;
+    private static final int deliveryPriority=javax.jms.Message.DEFAULT_PRIORITY;
+    private Connection connection=null;
+    private MessageProducer publisher=null;
+    private TopicSubscriber subscriber=null;
+    private Topic topic=null;
+    private Session session=null;
+    ActiveMQConnectionFactory connectionFactory=null;
+    BrokerService broker;
+
+    protected void setUp() throws Exception{
+        super.setUp();
+        broker=new BrokerService();
+        
+        broker.setPersistenceAdapter(new KahaPersistenceAdapter(new File ("TEST_STUFD")));
+        broker.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
+        broker.start();
+        connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
+        /*
+         * Doesn't matter if you enable or disable these, so just leaving them out for this test case
+         * connectionFactory.setAlwaysSessionAsync(true); connectionFactory.setAsyncDispatch(true);
+         */
+        connectionFactory.setUseAsyncSend(true);
+    }
+
+    protected void tearDown() throws Exception{
+        super.tearDown();
+        broker.stop();
+    }
+
+    public void test1CreateSubscription() throws Exception{
+        try{
+            /*
+             * Step 1 - Establish a connection with a client id and create a durable subscription
+             */
+            connection=connectionFactory.createConnection(USERNAME,DEFAULT_PASSWORD);
+            assertNotNull(connection);
+            connection.setClientID(CLIENTID);
+            session=connection.createSession(false,javax.jms.Session.CLIENT_ACKNOWLEDGE);
+            assertNotNull(session);
+            topic=session.createTopic(TOPIC_NAME);
+            assertNotNull(topic);
+            subscriber=session.createDurableSubscriber(topic,SUBID,"",false);
+            assertNotNull(subscriber);
+            subscriber.close();
+            session.close();
+            connection.close();
+        }catch(JMSException ex){
+            try{
+                connection.close();
+            }catch(Exception ignore){}
+            throw new AssertionFailedError("Create Subscription caught: "+ex);
+        }
+    }
+
+    public void test2ProducerTestCase(){
+        /*
+         * Step 2 - Establish a connection without a client id and create a producer and start pumping messages. We will
+         * get hung
+         */
+        try{
+            connection=connectionFactory.createConnection(USERNAME,DEFAULT_PASSWORD);
+            assertNotNull(connection);
+            session=connection.createSession(false,javax.jms.Session.CLIENT_ACKNOWLEDGE);
+            assertNotNull(session);
+            topic=session.createTopic(TOPIC_NAME);
+            assertNotNull(topic);
+            publisher=session.createProducer(topic);
+            assertNotNull(publisher);
+            MapMessage msg=session.createMapMessage();
+            assertNotNull(msg);
+            msg.setString("key1","value1");
+            int loop;
+            for(loop=0;loop<100000;loop++){
+                msg.setInt("key2",loop);
+                publisher.send(msg,deliveryMode,deliveryPriority,Message.DEFAULT_TIME_TO_LIVE);
+                if (loop%500==0){
+                    System.out.println("Sent " + loop + " messages");
+                }
+            }
+            this.assertEquals(loop,100000);
+            publisher.close();
+            session.close();
+            connection.stop();
+            connection.stop();
+        }catch(JMSException ex){
+            try{
+                connection.close();
+            }catch(Exception ignore){}
+            throw new AssertionFailedError("Create Subscription caught: "+ex);
+        }
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain