You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/08/06 19:07:57 UTC

svn commit: r563194 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/broker/region/ activemq-core/src/main/java/org/apache/activemq/command/ activemq-core/src/main/java/org/apache/activemq/openwire/v3/ activemq-core/src/main/java/o...

Author: chirino
Date: Mon Aug  6 10:07:55 2007
New Revision: 563194

URL: http://svn.apache.org/viewvc?view=rev&rev=563194
Log:
Fix for AMQ-1356 : Durable Subscriptions do not work with Wildcards after broker is restarted.
 - Added a RecoveryBrokerTest.testWildCardSubscriptionPreservedOnRestart() test case that was showing
   that that wildcards did not work with DurableSubscriptions
 - Fix the TransactedTopicMasterSlaveTest so that setDeleteAllMessagesOnStartup() takes effect (had to be done before the connectors are added.
 - Change the MessageStore interface so that subscriptions are created using the data in the SubscriptionInfo class
 - Added a subscribedDestination field to the SubscriptionInfo so that the original wildcard subscrption can be remembered
 - The KahaReference store now deletes it's State store too when deleteAllMessages() is called
 - Fixed KahaPersistenceAdapter.getDestinations() so that it actually returns all the destinations.
 - We now recover all the topics eagerly when the topic region is started.


Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/SubscriptionInfoMarshaller.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/AxionJDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/SubscriptionInfoTest.java
    activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java
    activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicReferenceStore.java
    activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Mon Aug  6 10:07:55 2007
@@ -17,10 +17,13 @@
  */
 package org.apache.activemq.broker.region;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import javax.jms.JMSException;
 
@@ -36,14 +39,13 @@
 import org.apache.activemq.command.MessagePull;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.Response;
+import org.apache.activemq.filter.DestinationFilter;
 import org.apache.activemq.filter.DestinationMap;
 import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import java.util.concurrent.ConcurrentHashMap;
-
 /**
  *
  * @version $Revision: 1.14 $
@@ -81,6 +83,16 @@
 
     public void start() throws Exception {
         started = true;
+        
+        Set inactiveDests = getInactiveDestinations();
+        for (Iterator iter = inactiveDests.iterator(); iter.hasNext();) {
+        	ActiveMQDestination dest = (ActiveMQDestination) iter.next();
+        	
+        	ConnectionContext context = new ConnectionContext();
+        	context.setBroker(broker.getBrokerService().getBroker());
+        	context.getBroker().addDestination(context , dest);
+        }
+        
         for (Iterator i = destinations.values().iterator();i.hasNext();) {
             Destination dest = (Destination)i.next();
             dest.start();
@@ -110,18 +122,28 @@
                 dest.start();
                 destinations.put(destination,dest);
                 destinationMap.put(destination,dest);
-                // Add all consumers that are interested in the destination.
-                for(Iterator iter=subscriptions.values().iterator();iter.hasNext();){
-                    Subscription sub=(Subscription)iter.next();
-                    if(sub.matches(destination)){
-                        dest.addSubscription(context,sub);
-                    }
-                }
+                addSubscriptionsForDestination(context, dest);
             }
             return dest;
         }
     }
 
+	protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context,
+			Destination dest) throws Exception {
+		
+		ArrayList<Subscription> rc = new ArrayList<Subscription>();
+		// Add all consumers that are interested in the destination.
+		for(Iterator iter=subscriptions.values().iterator();iter.hasNext();){
+		    Subscription sub=(Subscription)iter.next();
+		    if(sub.matches(dest.getActiveMQDestination())){
+		        dest.addSubscription(context,sub);
+		        rc.add(sub);
+		    }
+		}
+		return rc;
+		
+	}
+
     public void removeDestination(ConnectionContext context,ActiveMQDestination destination,long timeout)
                     throws Exception{
 
@@ -205,7 +227,6 @@
                 return (Subscription)o;
             }
 
-            Subscription sub = createSubscription(context, info);
 
             // We may need to add some destinations that are in persistent store but not active
             // in the broker.
@@ -216,14 +237,9 @@
             // eagerly load all destinations into the broker but have an inactive state for the
             // destination which has reduced memory usage.
             //
-            Set inactiveDests = getInactiveDestinations();
-            for (Iterator iter = inactiveDests.iterator(); iter.hasNext();) {
-            	ActiveMQDestination dest = (ActiveMQDestination) iter.next();
-            	if( sub.matches(dest) ) {
-            		context.getBroker().addDestination(context, dest);
-            	}
-            }
+            DestinationFilter destinationFilter = DestinationFilter.parseFilter(info.getDestination());
             
+            Subscription sub = createSubscription(context, info);
 
             subscriptions.put(info.getConsumerId(), sub);
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java Mon Aug  6 10:07:55 2007
@@ -27,6 +27,8 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.TopicMessageStore;
 
 /**
  * 
@@ -53,4 +55,5 @@
     
     public Message[] browse();
     public String getName();
+	public MessageStore getMessageStore();
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java Mon Aug  6 10:07:55 2007
@@ -25,6 +25,7 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.store.MessageStore;
 
 import java.io.IOException;
 import java.util.Iterator;
@@ -114,4 +115,8 @@
             dest.send(context, message);
         }
     }
+
+	public MessageStore getMessageStore() {
+		return next.getMessageStore();
+	}
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Aug  6 10:07:55 2007
@@ -425,7 +425,7 @@
                 context.getConnection().dispatchAsync(md);
             }else{
                 context.getConnection().dispatchSync(md);
-                onDispatch(node,message);
+                onDispatch(node,message);                
             }
             //System.err.println(broker.getBrokerName() + " " + this + " (" + enqueueCounter + ", " + dispatchCounter +") " + node);
             return true;
@@ -439,11 +439,13 @@
             if(node!=QueueMessageReference.NULL_MESSAGE){
                 node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
             }
-            try{
-                dispatchMatched();
-            }catch(IOException e){
-                context.getConnection().serviceExceptionAsync(e);
-            }
+        }
+        if( info.isDispatchAsync() ) {
+	        try{
+	            dispatchMatched();
+	        }catch(IOException e){
+	            context.getConnection().serviceExceptionAsync(e);
+	        }
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Mon Aug  6 10:07:55 2007
@@ -23,6 +23,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArraySet;
+
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
@@ -183,7 +184,13 @@
             }
             // Do we need to create the subscription?
             if (info == null) {
-                store.addSubsciption(clientId, subscriptionName, selector, subscription.getConsumerInfo().isRetroactive());
+            	info = new SubscriptionInfo();
+            	info.setClientId(clientId);
+            	info.setSelector(selector);
+            	info.setSubscriptionName(subscriptionName);
+            	info.setDestination(getActiveMQDestination()); // This destination is an actual destination id.
+            	info.setSubscribedDestination(subscription.getConsumerInfo().getDestination()); // This destination might be a pattern
+                store.addSubsciption(info, subscription.getConsumerInfo().isRetroactive());
             }
     
             final MessageEvaluationContext msgContext = new MessageEvaluationContext();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Mon Aug  6 10:07:55 2007
@@ -18,7 +18,9 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
 
 import javax.jms.InvalidDestinationException;
@@ -148,19 +150,14 @@
         return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getPercentUsage()
                 + "%";
     }
-
-    // Implementation methods
-    // -------------------------------------------------------------------------
-    protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
-        Topic topic = (Topic) super.createDestination(context, destination);
- 
-        recoverDurableSubscriptions(context, topic);
-        
-        return topic;
-    }
-
-    private void recoverDurableSubscriptions(ConnectionContext context, Topic topic) throws IOException, JMSException, Exception {
-        TopicMessageStore store = (TopicMessageStore) topic.getMessageStore();
+    
+    @Override
+    protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception {
+    	
+    	List<Subscription> rc = super.addSubscriptionsForDestination(context, dest);    	
+    	HashSet<Subscription> dupChecker = new HashSet<Subscription>(rc);
+    	
+        TopicMessageStore store = (TopicMessageStore) dest.getMessageStore();
         // Eagerly recover the durable subscriptions
         if (store != null) {            
             SubscriptionInfo[] infos = store.getAllSubscriptions();
@@ -181,16 +178,40 @@
                     sub = (DurableTopicSubscription) createSubscription(c, consumerInfo );
                 }
                 
-                topic.addSubscription(context, sub);
-            }            
+				if( dupChecker.contains(sub ) ) {
+					continue;
+				}
+
+                dupChecker.add(sub);
+                rc.add(sub);
+                dest.addSubscription(context, sub);
+            }
+            
+            // Now perhaps there other durable subscriptions (via wild card) that would match this destination..
+            durableSubscriptions.values();
+            for (Iterator iterator = durableSubscriptions.values().iterator(); iterator
+					.hasNext();) {
+				DurableTopicSubscription sub = (DurableTopicSubscription) iterator.next();
+				// Skip over subscriptions that we allready added..
+				if( dupChecker.contains(sub ) ) {
+					continue;
+				}
+				
+				if( sub.matches(dest.getActiveMQDestination()) ) {
+	                rc.add(sub);
+	                dest.addSubscription(context, sub);
+				}
+			}            
         }
+        
+        return rc;
     }
     
     private ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) {
         ConsumerInfo rc = new ConsumerInfo();
         rc.setSelector(info.getSelector());
         rc.setSubscriptionName(info.getSubscriptionName());
-        rc.setDestination(info.getDestination());
+        rc.setDestination(info.getSubscribedDestination());
         rc.setConsumerId(createConsumerId());
         return rc;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java Mon Aug  6 10:07:55 2007
@@ -21,6 +21,7 @@
 
 
 /**
+ * Used to represent a durable subscription.
  * 
  * @openwire:marshaller code="55"
  * @version $Revision: 1.6 $
@@ -29,6 +30,7 @@
 
     public static final byte DATA_STRUCTURE_TYPE=CommandTypes.DURABLE_SUBSCRIPTION_INFO;
 
+    protected ActiveMQDestination subscribedDestination;
     protected ActiveMQDestination destination;
     protected String clientId;
     protected String subscriptionName;
@@ -50,6 +52,9 @@
     }
 
     /**
+     * This is the a resolved destination that the subscription is receiving messages from.
+     * This will never be a pattern or a composite destination.
+     * 
      * @openwire:property version=1 cache=true
      */
     public ActiveMQDestination getDestination() {
@@ -120,5 +125,24 @@
         }
         return result;
     }
+
+    /**
+     * The destination the client originally subscribed to.. This may not match the {@see getDestination} method
+     * if the subscribed destination uses patterns or composites.
+     *  
+     *  If the subscribed destinationis not set, this just ruturns the desitination.
+     *  
+     * @openwire:property version=3
+     */
+	public ActiveMQDestination getSubscribedDestination() {
+		if( subscribedDestination == null ) {
+			return getDestination();
+		}
+		return subscribedDestination;
+	}
+
+	public void setSubscribedDestination(ActiveMQDestination subscribedDestination) {
+		this.subscribedDestination = subscribedDestination;
+	}
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/SubscriptionInfoMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/SubscriptionInfoMarshaller.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/SubscriptionInfoMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/SubscriptionInfoMarshaller.java Mon Aug  6 10:07:55 2007
@@ -70,6 +70,7 @@
         info.setDestination((org.apache.activemq.command.ActiveMQDestination) tightUnmarsalCachedObject(wireFormat, dataIn, bs));
         info.setSelector(tightUnmarshalString(dataIn, bs));
         info.setSubcriptionName(tightUnmarshalString(dataIn, bs));
+        info.setSubscribedDestination((org.apache.activemq.command.ActiveMQDestination) tightUnmarsalNestedObject(wireFormat, dataIn, bs));
 
     }
 
@@ -86,6 +87,7 @@
         rc += tightMarshalCachedObject1(wireFormat, (DataStructure)info.getDestination(), bs);
         rc += tightMarshalString1(info.getSelector(), bs);
         rc += tightMarshalString1(info.getSubcriptionName(), bs);
+        rc += tightMarshalNestedObject1(wireFormat, (DataStructure)info.getSubscribedDestination(), bs);
 
         return rc + 0;
     }
@@ -105,6 +107,7 @@
         tightMarshalCachedObject2(wireFormat, (DataStructure)info.getDestination(), dataOut, bs);
         tightMarshalString2(info.getSelector(), dataOut, bs);
         tightMarshalString2(info.getSubcriptionName(), dataOut, bs);
+        tightMarshalNestedObject2(wireFormat, (DataStructure)info.getSubscribedDestination(), dataOut, bs);
 
     }
 
@@ -123,6 +126,7 @@
         info.setDestination((org.apache.activemq.command.ActiveMQDestination) looseUnmarsalCachedObject(wireFormat, dataIn));
         info.setSelector(looseUnmarshalString(dataIn));
         info.setSubcriptionName(looseUnmarshalString(dataIn));
+        info.setSubscribedDestination((org.apache.activemq.command.ActiveMQDestination) looseUnmarsalNestedObject(wireFormat, dataIn));
 
     }
 
@@ -139,6 +143,7 @@
         looseMarshalCachedObject(wireFormat, (DataStructure)info.getDestination(), dataOut);
         looseMarshalString(info.getSelector(), dataOut);
         looseMarshalString(info.getSubcriptionName(), dataOut);
+        looseMarshalNestedObject(wireFormat, (DataStructure)info.getSubscribedDestination(), dataOut);
 
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java Mon Aug  6 10:07:55 2007
@@ -71,8 +71,8 @@
             throws IOException {
         delegate.acknowledge(context, clientId, subscriptionName, messageId);
     }
-    public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
-        delegate.addSubsciption(clientId, subscriptionName, selector, retroactive);
+    public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
+        delegate.addSubsciption(subscriptionInfo, retroactive);
     }
     public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
         delegate.deleteSubscription(clientId, subscriptionName);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java Mon Aug  6 10:07:55 2007
@@ -106,7 +106,7 @@
     public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException;
 
     /**
-     * Lists all the durable subscirptions for a given destination.
+     * Lists all the durable subscriptions for a given destination.
      * 
      * @return an array SubscriptionInfos
      * @throws IOException
@@ -126,6 +126,6 @@
      * @throws IOException
      * 
      */
-    public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
+    public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive)
                     throws IOException;
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java Mon Aug  6 10:07:55 2007
@@ -132,6 +132,6 @@
      * @throws IOException
      * 
      */
-    public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
+    public void addSubsciption(SubscriptionInfo subscriptionInfo,boolean retroactive)
                     throws IOException;	
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java Mon Aug  6 10:07:55 2007
@@ -70,9 +70,9 @@
         return topicReferenceStore.lookupSubscription(clientId,subscriptionName);
     }
 
-    public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
+    public void addSubsciption(SubscriptionInfo subscriptionInfo,boolean retroactive)
             throws IOException{
-        topicReferenceStore.addSubsciption(clientId,subscriptionName,selector,retroactive);
+        topicReferenceStore.addSubsciption(subscriptionInfo,retroactive);
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java Mon Aug  6 10:07:55 2007
@@ -56,8 +56,7 @@
     public abstract void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,String clientId,
             String subscriptionName,long seq,int maxReturned,JDBCMessageRecoveryListener listener) throws Exception;
 
-    public abstract void doSetSubscriberEntry(TransactionContext c,ActiveMQDestination destination,String clientId,
-            String subscriptionName,String selector,boolean retroactive) throws SQLException,IOException;
+    public abstract void doSetSubscriberEntry(TransactionContext c,SubscriptionInfo subscriptionInfo,boolean retroactive) throws SQLException,IOException;
 
     public abstract SubscriptionInfo doGetSubscriberEntry(TransactionContext c,ActiveMQDestination destination,
             String clientId,String subscriptionName) throws SQLException,IOException;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Mon Aug  6 10:07:55 2007
@@ -138,16 +138,16 @@
      * @see org.apache.activemq.store.TopicMessageStore#storeSubsciption(org.apache.activemq.service.SubscriptionInfo,
      *      boolean)
      */
-    public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive)
+    public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive)
             throws IOException {
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
             c = persistenceAdapter.getTransactionContext();
-            adapter.doSetSubscriberEntry(c, destination, clientId, subscriptionName, selector, retroactive);
+            adapter.doSetSubscriberEntry(c, subscriptionInfo, retroactive);
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ",e);
             throw IOExceptionSupport
-                    .create("Failed to lookup subscription for info: " + clientId + ". Reason: " + e, e);
+                    .create("Failed to lookup subscription for info: " + subscriptionInfo.getClientId() + ". Reason: " + e, e);
         } finally {
             c.close();
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Mon Aug  6 10:07:55 2007
@@ -84,6 +84,7 @@
                     "CREATE INDEX " + getFullMessageTableName() + "_EIDX ON " + getFullMessageTableName()
                             + " (EXPIRATION)",
                     "CREATE TABLE " + getFullAckTableName() + "(" + "CONTAINER " + containerNameDataType + " NOT NULL"
+                            + ", SUB_DEST " + stringIdDataType 
                             + ", CLIENT_ID " + stringIdDataType + " NOT NULL" + ", SUB_NAME " + stringIdDataType
                             + " NOT NULL" + ", SELECTOR " + stringIdDataType + ", LAST_ACKED_ID " + sequenceDataType
                             + ", PRIMARY KEY ( CONTAINER, CLIENT_ID, SUB_NAME))", 
@@ -165,14 +166,14 @@
     public String getCreateDurableSubStatement() {
         if (createDurableSubStatement == null) {
             createDurableSubStatement = "INSERT INTO " + getFullAckTableName()
-                    + "(CONTAINER, CLIENT_ID, SUB_NAME, SELECTOR, LAST_ACKED_ID) " + "VALUES (?, ?, ?, ?, ?)";
+                    + "(CONTAINER, CLIENT_ID, SUB_NAME, SELECTOR, LAST_ACKED_ID, SUB_DEST) " + "VALUES (?, ?, ?, ?, ?, ?)";
         }
         return createDurableSubStatement;
     }
 
     public String getFindDurableSubStatement() {
         if (findDurableSubStatement == null) {
-            findDurableSubStatement = "SELECT SELECTOR, SUB_NAME " + "FROM " + getFullAckTableName()
+            findDurableSubStatement = "SELECT SELECTOR, SUB_DEST " + "FROM " + getFullAckTableName()
                     + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
         }
         return findDurableSubStatement;
@@ -180,7 +181,7 @@
 
     public String getFindAllDurableSubsStatement() {
         if (findAllDurableSubsStatement == null) {
-            findAllDurableSubsStatement = "SELECT SELECTOR, SUB_NAME, CLIENT_ID" + " FROM " + getFullAckTableName()
+            findAllDurableSubsStatement = "SELECT SELECTOR, SUB_NAME, CLIENT_ID, SUB_DEST" + " FROM " + getFullAckTableName()
                     + " WHERE CONTAINER=?";
         }
         return findAllDurableSubsStatement;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/AxionJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/AxionJDBCAdapter.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/AxionJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/AxionJDBCAdapter.java Mon Aug  6 10:07:55 2007
@@ -48,6 +48,7 @@
                          "CREATE INDEX "+statements.getFullMessageTableName()+"_EIDX ON "+statements.getFullMessageTableName()+" (EXPIRATION)",                 
                          "CREATE TABLE "+statements.getFullAckTableName()+"("
                                +"CONTAINER "+statements.getContainerNameDataType()+" NOT NULL"
+                               +", SUB_DEST " + statements.getContainerNameDataType()
                                +", CLIENT_ID "+statements.getStringIdDataType()+" NOT NULL"
                                +", SUB_NAME "+statements.getStringIdDataType()+" NOT NULL"
                                +", SELECTOR "+statements.getStringIdDataType()

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Mon Aug  6 10:07:55 2007
@@ -431,8 +431,7 @@
      * @see org.apache.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection, java.lang.Object,
      *      org.apache.activemq.service.SubscriptionInfo)
      */
-    public void doSetSubscriberEntry(TransactionContext c,ActiveMQDestination destination,String clientId,
-            String subscriptionName,String selector,boolean retroactive) throws SQLException,IOException{
+    public void doSetSubscriberEntry(TransactionContext c,SubscriptionInfo info,boolean retroactive) throws SQLException,IOException{
         // dumpTables(c, destination.getQualifiedName(), clientId, subscriptionName);
         PreparedStatement s=null;
         try{
@@ -451,13 +450,14 @@
                 }
             }
             s=c.getConnection().prepareStatement(statements.getCreateDurableSubStatement());
-            s.setString(1,destination.getQualifiedName());
-            s.setString(2,clientId);
-            s.setString(3,subscriptionName);
-            s.setString(4,selector);
+            s.setString(1,info.getDestination().getQualifiedName());
+            s.setString(2,info.getClientId());
+            s.setString(3,info.getSubscriptionName());
+            s.setString(4,info.getSelector());
             s.setLong(5,lastMessageId);
+            s.setString(6, info.getSubscribedDestination().getQualifiedName());
             if(s.executeUpdate()!=1){
-                throw new IOException("Could not create durable subscription for: "+clientId);
+                throw new IOException("Could not create durable subscription for: "+info.getClientId());
             }
         }finally{
             close(s);
@@ -480,8 +480,9 @@
             SubscriptionInfo subscription=new SubscriptionInfo();
             subscription.setDestination(destination);
             subscription.setClientId(clientId);
-            subscription.setSubcriptionName(subscriptionName);
+            subscription.setSubscriptionName(subscriptionName);
             subscription.setSelector(rs.getString(1));
+            subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(2), ActiveMQDestination.QUEUE_TYPE));
             return subscription;
         }finally{
             close(rs);
@@ -502,8 +503,9 @@
                 SubscriptionInfo subscription=new SubscriptionInfo();
                 subscription.setDestination(destination);
                 subscription.setSelector(rs.getString(1));
-                subscription.setSubcriptionName(rs.getString(2));
+                subscription.setSubscriptionName(rs.getString(2));
                 subscription.setClientId(rs.getString(3));
+                subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(4),ActiveMQDestination.QUEUE_TYPE));
                 rc.add(subscription);
             }
             return (SubscriptionInfo[])rc.toArray(new SubscriptionInfo[rc.size()]);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java Mon Aug  6 10:07:55 2007
@@ -68,9 +68,9 @@
         return longTermStore.lookupSubscription(clientId, subscriptionName);
     }
 
-    public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
+    public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
         this.peristenceAdapter.checkpoint(true, true);
-        longTermStore.addSubsciption(clientId, subscriptionName, selector, retroactive);
+        longTermStore.addSubsciption(subscriptionInfo, retroactive);
     }
 
     public void addMessage(ConnectionContext context, Message message) throws IOException {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java Mon Aug  6 10:07:55 2007
@@ -27,6 +27,7 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.kaha.CommandMarshaller;
+import org.apache.activemq.kaha.ContainerId;
 import org.apache.activemq.kaha.ListContainer;
 import org.apache.activemq.kaha.MapContainer;
 import org.apache.activemq.kaha.Marshaller;
@@ -71,7 +72,8 @@
         try{
             Store store=getStore();
             for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){
-                Object obj=i.next();
+                ContainerId id=(ContainerId) i.next();
+                Object obj = id.getKey();
                 if(obj instanceof ActiveMQDestination){
                     rc.add((ActiveMQDestination)obj);
                 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Mon Aug  6 10:07:55 2007
@@ -211,7 +211,7 @@
         for(Iterator i=durableSubscribers.iterator();i.hasNext();){
             SubscriptionInfo info=(SubscriptionInfo)i.next();
             TopicReferenceStore ts=createTopicReferenceStore((ActiveMQTopic)info.getDestination());
-            ts.addSubsciption(info.getClientId(),info.getSubscriptionName(),info.getSelector(),false);
+            ts.addSubsciption(info,false);
         }
     }
 
@@ -247,6 +247,20 @@
             this.stateStore=createStateStore(getDirectory());
         }
         return this.stateStore;
+    }
+
+    public void deleteAllMessages() throws IOException{
+    	super.deleteAllMessages();
+        if(stateStore!=null){
+            if(stateStore.isInitialized()){
+            	stateStore.clear();
+            }else{
+            	stateStore.delete();
+            }
+        }else{
+            File stateDirectory=new File(getDirectory(),"kr-state");
+            StoreFactory.delete(stateDirectory.getAbsolutePath());
+        }
     }
 
     private Store createStateStore(File directory){

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java Mon Aug  6 10:07:55 2007
@@ -106,14 +106,9 @@
         return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
     }
 
-    public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
+    public synchronized void addSubsciption(SubscriptionInfo info,boolean retroactive)
             throws IOException{
-        SubscriptionInfo info=new SubscriptionInfo();
-        info.setDestination(destination);
-        info.setClientId(clientId);
-        info.setSelector(selector);
-        info.setSubcriptionName(subscriptionName);
-        String key=getSubscriptionKey(clientId,subscriptionName);
+        String key=getSubscriptionKey(info.getClientId(),info.getSubscriptionName());
         // if already exists - won't add it again as it causes data files
         // to hang around
         if(!subscriberContainer.containsKey(key)){

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java Mon Aug  6 10:07:55 2007
@@ -18,6 +18,7 @@
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -46,7 +47,7 @@
         subscriberContainer=subsContainer;
         // load all the Ack containers
         for(Iterator i=subscriberContainer.keySet().iterator();i.hasNext();){
-            Object key=i.next();
+            String key=(String) i.next();
             addSubscriberMessageContainer(key);
         }
     }
@@ -102,8 +103,8 @@
         }
     }
 
-    protected ListContainer addSubscriberMessageContainer(Object key) throws IOException{
-        ListContainer container=store.getListContainer(key,"topic-subs-references");
+    protected ListContainer addSubscriberMessageContainer(String key) throws IOException{
+        ListContainer container=store.getListContainer(destination,"topic-subs-references-"+key);
         Marshaller marshaller=new ConsumerMessageRefMarshaller();
         container.setMarshaller(marshaller);
         TopicSubContainer tsc=new TopicSubContainer(container);
@@ -141,14 +142,9 @@
         }
     }
 
-    public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
+    public synchronized void addSubsciption(SubscriptionInfo info,boolean retroactive)
             throws IOException{
-        SubscriptionInfo info=new SubscriptionInfo();
-        info.setDestination(destination);
-        info.setClientId(clientId);
-        info.setSelector(selector);
-        info.setSubcriptionName(subscriptionName);
-        String key=getSubscriptionKey(clientId,subscriptionName);
+        String key=getSubscriptionKey(info.getClientId(), info.getSubscriptionName());
         // if already exists - won't add it again as it causes data files
         // to hang around
         if(!subscriberContainer.containsKey(key)){
@@ -253,7 +249,7 @@
         }
     }
 
-    protected void removeSubscriberMessageContainer(Object key) throws IOException{
+    protected void removeSubscriberMessageContainer(String key) throws IOException{
         subscriberContainer.remove(key);
         TopicSubContainer container=(TopicSubContainer)subscriberMessages.remove(key);
         for(Iterator i=container.iterator();i.hasNext();){
@@ -270,7 +266,7 @@
                 }
             }
         }
-        store.deleteListContainer(key,"topic-subs-references");
+        store.deleteListContainer(destination,"topic-subs-references-"+key);
     }
 
     protected String getSubscriptionKey(String clientId,String subscriberName){

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java Mon Aug  6 10:07:55 2007
@@ -73,14 +73,9 @@
         return (SubscriptionInfo)subscriberDatabase.get(new SubscriptionKey(clientId,subscriptionName));
     }
 
-    public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
+    public synchronized void addSubsciption(SubscriptionInfo info,boolean retroactive)
             throws IOException{
-        SubscriptionInfo info=new SubscriptionInfo();
-        info.setDestination(destination);
-        info.setClientId(clientId);
-        info.setSelector(selector);
-        info.setSubcriptionName(subscriptionName);
-        SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
+        SubscriptionKey key=new SubscriptionKey(info);
         MemoryTopicSub sub=new MemoryTopicSub();
         topicSubMap.put(key,sub);
         if(retroactive){

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java Mon Aug  6 10:07:55 2007
@@ -17,6 +17,8 @@
  */
 package org.apache.activemq.broker;
 
+import java.util.ArrayList;
+
 import javax.jms.DeliveryMode;
 
 import junit.framework.Test;
@@ -29,6 +31,7 @@
 import org.apache.activemq.command.LocalTransactionId;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.SessionInfo;
 
@@ -39,6 +42,113 @@
  */
 public class RecoveryBrokerTest extends BrokerRestartTestSupport {
         
+	/**
+	 * Used to verify that after a broker restart durable subscriptions that 
+	 * use wild cards are still wild card subscription after broker restart.
+	 * 
+	 * @throws Exception
+	 */
+    public void testWildCardSubscriptionPreservedOnRestart() throws Exception {
+        ActiveMQDestination dest1 = new ActiveMQTopic("TEST.A");
+        ActiveMQDestination dest2 = new ActiveMQTopic("TEST.B");
+        ActiveMQDestination dest3 = new ActiveMQTopic("TEST.C");
+        ActiveMQDestination wildDest = new ActiveMQTopic("TEST.>");
+        
+        ArrayList<MessageId> sentBeforeRestart = new ArrayList<MessageId>(); 
+        ArrayList<MessageId> sentBeforeCreateConsumer = new ArrayList<MessageId>(); 
+        ArrayList<MessageId> sentAfterCreateConsumer = new ArrayList<MessageId>(); 
+        
+        // Setup a first connection
+        {
+	        StubConnection connection1 = createConnection();
+	        ConnectionInfo connectionInfo1 = createConnectionInfo();
+	        connectionInfo1.setClientId("A");
+	        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+	        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+	        connection1.send(connectionInfo1);
+	        connection1.send(sessionInfo1);
+	        connection1.send(producerInfo1);
+	        
+	        // Create the durable subscription.
+	        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, wildDest);
+	        consumerInfo1.setSubscriptionName("test");
+	        consumerInfo1.setPrefetchSize(100);
+	        connection1.send(consumerInfo1);
+	        
+	        // Close the subscription.
+	        connection1.send(closeConsumerInfo(consumerInfo1));
+	
+	        // Send the messages
+	        for( int i=0; i < 4; i++) {
+	        	Message m = createMessage(producerInfo1, dest1, DeliveryMode.PERSISTENT);
+		        connection1.send(m);
+		        sentBeforeRestart.add(m.getMessageId());
+	        }
+	        connection1.request(closeConnectionInfo(connectionInfo1));
+	        connection1.stop();
+        }
+        
+        // Restart the broker.
+        restartBroker();
+        
+        // Get a connection to the new broker.
+        {
+	        StubConnection connection2 = createConnection();
+	        ConnectionInfo connectionInfo2 = createConnectionInfo();
+	        connectionInfo2.setClientId("A");
+	        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+	        connection2.send(connectionInfo2);
+	        connection2.send(sessionInfo2);
+	        
+	        ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
+	        connection2.send(producerInfo2);        
+
+	        // Send messages before the durable subscription is re-activated.	        
+	        for( int i=0; i < 4; i++) {
+	        	Message m = createMessage(producerInfo2, dest2, DeliveryMode.PERSISTENT);
+		        connection2.send(m);
+		        sentBeforeCreateConsumer.add(m.getMessageId());
+	        }
+	        
+	        // Re-open the subscription.
+	        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, wildDest);
+	        consumerInfo2.setSubscriptionName("test");
+	        consumerInfo2.setPrefetchSize(100);
+	        connection2.send(consumerInfo2);
+
+	        // Send messages after the subscription is activated.
+	        for( int i=0; i < 4; i++) {
+	        	Message m = createMessage(producerInfo2, dest3, DeliveryMode.PERSISTENT);
+		        connection2.send(m);
+		        sentAfterCreateConsumer.add(m.getMessageId());
+	        }
+	        	        
+	        // We should get the recovered messages...
+	        for( int i=0; i < 4 ; i++ ) {
+	            Message m2 = receiveMessage(connection2);
+	            assertNotNull("Recovered message missing: "+i, m2);
+	            assertEquals(sentBeforeRestart.get(i), m2.getMessageId());
+	        }
+	        
+	        // We should get get the messages that were sent before the sub was reactivated.
+	        for( int i=0; i < 4 ; i++ ) {
+	            Message m2 = receiveMessage(connection2);
+	            assertNotNull("Before activated message missing: "+i, m2);
+	            assertEquals(sentBeforeCreateConsumer.get(i), m2.getMessageId());
+	        }
+	        
+	        // We should get get the messages that were sent after the sub was reactivated.
+	        for( int i=0; i < 4 ; i++ ) {
+	            Message m2 = receiveMessage(connection2);
+	            assertNotNull("After activated message missing: "+i, m2);
+	            assertEquals(""+i, sentAfterCreateConsumer.get(i), m2.getMessageId());
+	        }
+	        
+	        assertNoMessagesLeft(connection2);
+        }
+        
+    }
+    
     public void testConsumedQueuePersistentMessagesLostOnRestart() throws Exception {
         
         ActiveMQDestination destination = new ActiveMQQueue("TEST");

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java Mon Aug  6 10:07:55 2007
@@ -39,9 +39,9 @@
         broker.start();
         slave = new BrokerService();
         slave.setBrokerName("slave");
-        slave.addConnector("tcp://localhost:62002");
         slave.setDeleteAllMessagesOnStartup(true);
         slave.setMasterConnectorURI("tcp://localhost:62001");
+        slave.addConnector("tcp://localhost:62002");
         slave.start();
         // wait for thing to connect
         Thread.sleep(1000);
@@ -62,8 +62,8 @@
     protected BrokerService createBroker() throws Exception,URISyntaxException{
         BrokerService broker=new BrokerService();
         broker.setBrokerName("master");
-        broker.addConnector("tcp://localhost:62001");
         broker.setDeleteAllMessagesOnStartup(true);
+        broker.addConnector("tcp://localhost:62001");
         return broker;
     }
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/SubscriptionInfoTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/SubscriptionInfoTest.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/SubscriptionInfoTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/SubscriptionInfoTest.java Mon Aug  6 10:07:55 2007
@@ -55,5 +55,6 @@
         info.setDestination(createActiveMQDestination("Destination:2"));
         info.setSelector("Selector:3");
         info.setSubcriptionName("SubcriptionName:4");
+        info.setSubscribedDestination(createActiveMQDestination("SubscribedDestination:5"));
     }
 }

Modified: activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java (original)
+++ activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java Mon Aug  6 10:07:55 2007
@@ -59,14 +59,15 @@
 		adapter.commitEntityManager(context,manager);
 	}
 
-	public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
+	public void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
 		EntityManager manager = adapter.beginEntityManager(null);
 		try {			
 			StoredSubscription ss = new StoredSubscription();
-			ss.setClientId(clientId);
-			ss.setSubscriptionName(subscriptionName);
+			ss.setClientId(info.getClientId());
+			ss.setSubscriptionName(info.getSubscriptionName());
 			ss.setDestination(destinationName);
-			ss.setSelector(selector);
+			ss.setSelector(info.getSelector());
+			ss.setSubscribedDestination(info.getSubscribedDestination().getQualifiedName());
 			ss.setLastAckedId(-1);
 			
 			if( !retroactive ) {
@@ -125,7 +126,8 @@
 				info.setClientId(ss.getClientId());
 				info.setDestination(destination);
 				info.setSelector(ss.getSelector());
-				info.setSubcriptionName(ss.getSubscriptionName());
+				info.setSubscriptionName(ss.getSubscriptionName());
+				info.setSubscribedDestination(toSubscribedDestination(ss));
 				l.add(info);
 	        }
 			
@@ -171,7 +173,8 @@
 				rc.setClientId(ss.getClientId());
 				rc.setDestination(destination);
 				rc.setSelector(ss.getSelector());
-				rc.setSubcriptionName(ss.getSubscriptionName());
+				rc.setSubscriptionName(ss.getSubscriptionName());
+				rc.setSubscribedDestination(toSubscribedDestination(ss));
 			}
 		} catch (Throwable e) {
 			adapter.rollbackEntityManager(null,manager);
@@ -179,6 +182,12 @@
 		}
 		adapter.commitEntityManager(null,manager);
 		return rc;
+	}
+
+	private ActiveMQDestination toSubscribedDestination(StoredSubscription ss) {
+		if( ss.getSubscribedDestination() == null )
+			return null;
+		return ActiveMQDestination.createDestination(ss.getSubscribedDestination(), ActiveMQDestination.QUEUE_TYPE);
 	}
 
 	public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {

Modified: activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicReferenceStore.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicReferenceStore.java (original)
+++ activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicReferenceStore.java Mon Aug  6 10:07:55 2007
@@ -57,14 +57,15 @@
 		adapter.commitEntityManager(context,manager);
 	}
 
-	public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
+	public void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
 		EntityManager manager = adapter.beginEntityManager(null);
 		try {			
 			StoredSubscription ss = new StoredSubscription();
-			ss.setClientId(clientId);
-			ss.setSubscriptionName(subscriptionName);
+			ss.setClientId(info.getClientId());
+			ss.setSubscriptionName(info.getSubcriptionName());
 			ss.setDestination(destinationName);
-			ss.setSelector(selector);
+			ss.setSelector(info.getSelector());
+			ss.setSubscribedDestination(info.getSubscribedDestination().getQualifiedName());
 			ss.setLastAckedId(-1);
 			
 			if( !retroactive ) {
@@ -123,7 +124,8 @@
 				info.setClientId(ss.getClientId());
 				info.setDestination(destination);
 				info.setSelector(ss.getSelector());
-				info.setSubcriptionName(ss.getSubscriptionName());
+				info.setSubscriptionName(ss.getSubscriptionName());
+				info.setSubscribedDestination(toSubscribedDestination(ss));
 				l.add(info);
 	        }
 			
@@ -136,6 +138,12 @@
 		adapter.commitEntityManager(null,manager);		
 		return rc;
 	}
+	
+	private ActiveMQDestination toSubscribedDestination(StoredSubscription ss) {
+		if( ss.getSubscribedDestination() == null )
+			return null;
+		return ActiveMQDestination.createDestination(ss.getSubscribedDestination(), ActiveMQDestination.QUEUE_TYPE);
+	}
 
 	public int getMessageCount(String clientId, String subscriptionName) throws IOException {
 		Long rc;
@@ -169,7 +177,8 @@
 				rc.setClientId(ss.getClientId());
 				rc.setDestination(destination);
 				rc.setSelector(ss.getSelector());
-				rc.setSubcriptionName(ss.getSubscriptionName());
+				rc.setSubscriptionName(ss.getSubscriptionName());
+				rc.setSubscribedDestination(toSubscribedDestination(ss));
 			}
 		} catch (Throwable e) {
 			adapter.rollbackEntityManager(null,manager);

Modified: activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java (original)
+++ activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java Mon Aug  6 10:07:55 2007
@@ -106,7 +106,8 @@
     private long lastAckedId;
     @Basic
     private String selector;
-
+    @Basic
+    private String subscribedDestination;
 
 	public long getLastAckedId() {
 		return lastAckedId;
@@ -154,5 +155,13 @@
 
 	public void setId(long id) {
 		this.id = id;
+	}
+
+	public String getSubscribedDestination() {
+		return subscribedDestination;
+	}
+
+	public void setSubscribedDestination(String subscribedDestination) {
+		this.subscribedDestination = subscribedDestination;
 	}
 }