You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/08/06 19:07:57 UTC
svn commit: r563194 - in /activemq/trunk:
activemq-core/src/main/java/org/apache/activemq/broker/region/
activemq-core/src/main/java/org/apache/activemq/command/
activemq-core/src/main/java/org/apache/activemq/openwire/v3/
activemq-core/src/main/java/o...
Author: chirino
Date: Mon Aug 6 10:07:55 2007
New Revision: 563194
URL: http://svn.apache.org/viewvc?view=rev&rev=563194
Log:
Fix for AMQ-1356 : Durable Subscriptions do not work with Wildcards after broker is restarted.
- Added a RecoveryBrokerTest.testWildCardSubscriptionPreservedOnRestart() test case that was showing
that that wildcards did not work with DurableSubscriptions
- Fix the TransactedTopicMasterSlaveTest so that setDeleteAllMessagesOnStartup() takes effect (had to be done before the connectors are added.
- Change the MessageStore interface so that subscriptions are created using the data in the SubscriptionInfo class
- Added a subscribedDestination field to the SubscriptionInfo so that the original wildcard subscrption can be remembered
- The KahaReference store now deletes it's State store too when deleteAllMessages() is called
- Fixed KahaPersistenceAdapter.getDestinations() so that it actually returns all the destinations.
- We now recover all the topics eagerly when the topic region is started.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/SubscriptionInfoMarshaller.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/AxionJDBCAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/SubscriptionInfoTest.java
activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java
activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicReferenceStore.java
activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Mon Aug 6 10:07:55 2007
@@ -17,10 +17,13 @@
*/
package org.apache.activemq.broker.region;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import javax.jms.JMSException;
@@ -36,14 +39,13 @@
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
+import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.DestinationMap;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import java.util.concurrent.ConcurrentHashMap;
-
/**
*
* @version $Revision: 1.14 $
@@ -81,6 +83,16 @@
public void start() throws Exception {
started = true;
+
+ Set inactiveDests = getInactiveDestinations();
+ for (Iterator iter = inactiveDests.iterator(); iter.hasNext();) {
+ ActiveMQDestination dest = (ActiveMQDestination) iter.next();
+
+ ConnectionContext context = new ConnectionContext();
+ context.setBroker(broker.getBrokerService().getBroker());
+ context.getBroker().addDestination(context , dest);
+ }
+
for (Iterator i = destinations.values().iterator();i.hasNext();) {
Destination dest = (Destination)i.next();
dest.start();
@@ -110,18 +122,28 @@
dest.start();
destinations.put(destination,dest);
destinationMap.put(destination,dest);
- // Add all consumers that are interested in the destination.
- for(Iterator iter=subscriptions.values().iterator();iter.hasNext();){
- Subscription sub=(Subscription)iter.next();
- if(sub.matches(destination)){
- dest.addSubscription(context,sub);
- }
- }
+ addSubscriptionsForDestination(context, dest);
}
return dest;
}
}
+ protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context,
+ Destination dest) throws Exception {
+
+ ArrayList<Subscription> rc = new ArrayList<Subscription>();
+ // Add all consumers that are interested in the destination.
+ for(Iterator iter=subscriptions.values().iterator();iter.hasNext();){
+ Subscription sub=(Subscription)iter.next();
+ if(sub.matches(dest.getActiveMQDestination())){
+ dest.addSubscription(context,sub);
+ rc.add(sub);
+ }
+ }
+ return rc;
+
+ }
+
public void removeDestination(ConnectionContext context,ActiveMQDestination destination,long timeout)
throws Exception{
@@ -205,7 +227,6 @@
return (Subscription)o;
}
- Subscription sub = createSubscription(context, info);
// We may need to add some destinations that are in persistent store but not active
// in the broker.
@@ -216,14 +237,9 @@
// eagerly load all destinations into the broker but have an inactive state for the
// destination which has reduced memory usage.
//
- Set inactiveDests = getInactiveDestinations();
- for (Iterator iter = inactiveDests.iterator(); iter.hasNext();) {
- ActiveMQDestination dest = (ActiveMQDestination) iter.next();
- if( sub.matches(dest) ) {
- context.getBroker().addDestination(context, dest);
- }
- }
+ DestinationFilter destinationFilter = DestinationFilter.parseFilter(info.getDestination());
+ Subscription sub = createSubscription(context, info);
subscriptions.put(info.getConsumerId(), sub);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java Mon Aug 6 10:07:55 2007
@@ -27,6 +27,8 @@
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.TopicMessageStore;
/**
*
@@ -53,4 +55,5 @@
public Message[] browse();
public String getName();
+ public MessageStore getMessageStore();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java Mon Aug 6 10:07:55 2007
@@ -25,6 +25,7 @@
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.store.MessageStore;
import java.io.IOException;
import java.util.Iterator;
@@ -114,4 +115,8 @@
dest.send(context, message);
}
}
+
+ public MessageStore getMessageStore() {
+ return next.getMessageStore();
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Aug 6 10:07:55 2007
@@ -425,7 +425,7 @@
context.getConnection().dispatchAsync(md);
}else{
context.getConnection().dispatchSync(md);
- onDispatch(node,message);
+ onDispatch(node,message);
}
//System.err.println(broker.getBrokerName() + " " + this + " (" + enqueueCounter + ", " + dispatchCounter +") " + node);
return true;
@@ -439,11 +439,13 @@
if(node!=QueueMessageReference.NULL_MESSAGE){
node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
}
- try{
- dispatchMatched();
- }catch(IOException e){
- context.getConnection().serviceExceptionAsync(e);
- }
+ }
+ if( info.isDispatchAsync() ) {
+ try{
+ dispatchMatched();
+ }catch(IOException e){
+ context.getConnection().serviceExceptionAsync(e);
+ }
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Mon Aug 6 10:07:55 2007
@@ -23,6 +23,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
+
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
@@ -183,7 +184,13 @@
}
// Do we need to create the subscription?
if (info == null) {
- store.addSubsciption(clientId, subscriptionName, selector, subscription.getConsumerInfo().isRetroactive());
+ info = new SubscriptionInfo();
+ info.setClientId(clientId);
+ info.setSelector(selector);
+ info.setSubscriptionName(subscriptionName);
+ info.setDestination(getActiveMQDestination()); // This destination is an actual destination id.
+ info.setSubscribedDestination(subscription.getConsumerInfo().getDestination()); // This destination might be a pattern
+ store.addSubsciption(info, subscription.getConsumerInfo().isRetroactive());
}
final MessageEvaluationContext msgContext = new MessageEvaluationContext();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Mon Aug 6 10:07:55 2007
@@ -18,7 +18,9 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Set;
import javax.jms.InvalidDestinationException;
@@ -148,19 +150,14 @@
return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getPercentUsage()
+ "%";
}
-
- // Implementation methods
- // -------------------------------------------------------------------------
- protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
- Topic topic = (Topic) super.createDestination(context, destination);
-
- recoverDurableSubscriptions(context, topic);
-
- return topic;
- }
-
- private void recoverDurableSubscriptions(ConnectionContext context, Topic topic) throws IOException, JMSException, Exception {
- TopicMessageStore store = (TopicMessageStore) topic.getMessageStore();
+
+ @Override
+ protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception {
+
+ List<Subscription> rc = super.addSubscriptionsForDestination(context, dest);
+ HashSet<Subscription> dupChecker = new HashSet<Subscription>(rc);
+
+ TopicMessageStore store = (TopicMessageStore) dest.getMessageStore();
// Eagerly recover the durable subscriptions
if (store != null) {
SubscriptionInfo[] infos = store.getAllSubscriptions();
@@ -181,16 +178,40 @@
sub = (DurableTopicSubscription) createSubscription(c, consumerInfo );
}
- topic.addSubscription(context, sub);
- }
+ if( dupChecker.contains(sub ) ) {
+ continue;
+ }
+
+ dupChecker.add(sub);
+ rc.add(sub);
+ dest.addSubscription(context, sub);
+ }
+
+ // Now perhaps there other durable subscriptions (via wild card) that would match this destination..
+ durableSubscriptions.values();
+ for (Iterator iterator = durableSubscriptions.values().iterator(); iterator
+ .hasNext();) {
+ DurableTopicSubscription sub = (DurableTopicSubscription) iterator.next();
+ // Skip over subscriptions that we allready added..
+ if( dupChecker.contains(sub ) ) {
+ continue;
+ }
+
+ if( sub.matches(dest.getActiveMQDestination()) ) {
+ rc.add(sub);
+ dest.addSubscription(context, sub);
+ }
+ }
}
+
+ return rc;
}
private ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) {
ConsumerInfo rc = new ConsumerInfo();
rc.setSelector(info.getSelector());
rc.setSubscriptionName(info.getSubscriptionName());
- rc.setDestination(info.getDestination());
+ rc.setDestination(info.getSubscribedDestination());
rc.setConsumerId(createConsumerId());
return rc;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java Mon Aug 6 10:07:55 2007
@@ -21,6 +21,7 @@
/**
+ * Used to represent a durable subscription.
*
* @openwire:marshaller code="55"
* @version $Revision: 1.6 $
@@ -29,6 +30,7 @@
public static final byte DATA_STRUCTURE_TYPE=CommandTypes.DURABLE_SUBSCRIPTION_INFO;
+ protected ActiveMQDestination subscribedDestination;
protected ActiveMQDestination destination;
protected String clientId;
protected String subscriptionName;
@@ -50,6 +52,9 @@
}
/**
+ * This is the a resolved destination that the subscription is receiving messages from.
+ * This will never be a pattern or a composite destination.
+ *
* @openwire:property version=1 cache=true
*/
public ActiveMQDestination getDestination() {
@@ -120,5 +125,24 @@
}
return result;
}
+
+ /**
+ * The destination the client originally subscribed to.. This may not match the {@see getDestination} method
+ * if the subscribed destination uses patterns or composites.
+ *
+ * If the subscribed destinationis not set, this just ruturns the desitination.
+ *
+ * @openwire:property version=3
+ */
+ public ActiveMQDestination getSubscribedDestination() {
+ if( subscribedDestination == null ) {
+ return getDestination();
+ }
+ return subscribedDestination;
+ }
+
+ public void setSubscribedDestination(ActiveMQDestination subscribedDestination) {
+ this.subscribedDestination = subscribedDestination;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/SubscriptionInfoMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/SubscriptionInfoMarshaller.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/SubscriptionInfoMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/SubscriptionInfoMarshaller.java Mon Aug 6 10:07:55 2007
@@ -70,6 +70,7 @@
info.setDestination((org.apache.activemq.command.ActiveMQDestination) tightUnmarsalCachedObject(wireFormat, dataIn, bs));
info.setSelector(tightUnmarshalString(dataIn, bs));
info.setSubcriptionName(tightUnmarshalString(dataIn, bs));
+ info.setSubscribedDestination((org.apache.activemq.command.ActiveMQDestination) tightUnmarsalNestedObject(wireFormat, dataIn, bs));
}
@@ -86,6 +87,7 @@
rc += tightMarshalCachedObject1(wireFormat, (DataStructure)info.getDestination(), bs);
rc += tightMarshalString1(info.getSelector(), bs);
rc += tightMarshalString1(info.getSubcriptionName(), bs);
+ rc += tightMarshalNestedObject1(wireFormat, (DataStructure)info.getSubscribedDestination(), bs);
return rc + 0;
}
@@ -105,6 +107,7 @@
tightMarshalCachedObject2(wireFormat, (DataStructure)info.getDestination(), dataOut, bs);
tightMarshalString2(info.getSelector(), dataOut, bs);
tightMarshalString2(info.getSubcriptionName(), dataOut, bs);
+ tightMarshalNestedObject2(wireFormat, (DataStructure)info.getSubscribedDestination(), dataOut, bs);
}
@@ -123,6 +126,7 @@
info.setDestination((org.apache.activemq.command.ActiveMQDestination) looseUnmarsalCachedObject(wireFormat, dataIn));
info.setSelector(looseUnmarshalString(dataIn));
info.setSubcriptionName(looseUnmarshalString(dataIn));
+ info.setSubscribedDestination((org.apache.activemq.command.ActiveMQDestination) looseUnmarsalNestedObject(wireFormat, dataIn));
}
@@ -139,6 +143,7 @@
looseMarshalCachedObject(wireFormat, (DataStructure)info.getDestination(), dataOut);
looseMarshalString(info.getSelector(), dataOut);
looseMarshalString(info.getSubcriptionName(), dataOut);
+ looseMarshalNestedObject(wireFormat, (DataStructure)info.getSubscribedDestination(), dataOut);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java Mon Aug 6 10:07:55 2007
@@ -71,8 +71,8 @@
throws IOException {
delegate.acknowledge(context, clientId, subscriptionName, messageId);
}
- public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
- delegate.addSubsciption(clientId, subscriptionName, selector, retroactive);
+ public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
+ delegate.addSubsciption(subscriptionInfo, retroactive);
}
public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
delegate.deleteSubscription(clientId, subscriptionName);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java Mon Aug 6 10:07:55 2007
@@ -106,7 +106,7 @@
public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException;
/**
- * Lists all the durable subscirptions for a given destination.
+ * Lists all the durable subscriptions for a given destination.
*
* @return an array SubscriptionInfos
* @throws IOException
@@ -126,6 +126,6 @@
* @throws IOException
*
*/
- public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
+ public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive)
throws IOException;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java Mon Aug 6 10:07:55 2007
@@ -132,6 +132,6 @@
* @throws IOException
*
*/
- public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
+ public void addSubsciption(SubscriptionInfo subscriptionInfo,boolean retroactive)
throws IOException;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java Mon Aug 6 10:07:55 2007
@@ -70,9 +70,9 @@
return topicReferenceStore.lookupSubscription(clientId,subscriptionName);
}
- public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
+ public void addSubsciption(SubscriptionInfo subscriptionInfo,boolean retroactive)
throws IOException{
- topicReferenceStore.addSubsciption(clientId,subscriptionName,selector,retroactive);
+ topicReferenceStore.addSubsciption(subscriptionInfo,retroactive);
}
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java Mon Aug 6 10:07:55 2007
@@ -56,8 +56,7 @@
public abstract void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,String clientId,
String subscriptionName,long seq,int maxReturned,JDBCMessageRecoveryListener listener) throws Exception;
- public abstract void doSetSubscriberEntry(TransactionContext c,ActiveMQDestination destination,String clientId,
- String subscriptionName,String selector,boolean retroactive) throws SQLException,IOException;
+ public abstract void doSetSubscriberEntry(TransactionContext c,SubscriptionInfo subscriptionInfo,boolean retroactive) throws SQLException,IOException;
public abstract SubscriptionInfo doGetSubscriberEntry(TransactionContext c,ActiveMQDestination destination,
String clientId,String subscriptionName) throws SQLException,IOException;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Mon Aug 6 10:07:55 2007
@@ -138,16 +138,16 @@
* @see org.apache.activemq.store.TopicMessageStore#storeSubsciption(org.apache.activemq.service.SubscriptionInfo,
* boolean)
*/
- public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive)
+ public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive)
throws IOException {
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
c = persistenceAdapter.getTransactionContext();
- adapter.doSetSubscriberEntry(c, destination, clientId, subscriptionName, selector, retroactive);
+ adapter.doSetSubscriberEntry(c, subscriptionInfo, retroactive);
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ",e);
throw IOExceptionSupport
- .create("Failed to lookup subscription for info: " + clientId + ". Reason: " + e, e);
+ .create("Failed to lookup subscription for info: " + subscriptionInfo.getClientId() + ". Reason: " + e, e);
} finally {
c.close();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Mon Aug 6 10:07:55 2007
@@ -84,6 +84,7 @@
"CREATE INDEX " + getFullMessageTableName() + "_EIDX ON " + getFullMessageTableName()
+ " (EXPIRATION)",
"CREATE TABLE " + getFullAckTableName() + "(" + "CONTAINER " + containerNameDataType + " NOT NULL"
+ + ", SUB_DEST " + stringIdDataType
+ ", CLIENT_ID " + stringIdDataType + " NOT NULL" + ", SUB_NAME " + stringIdDataType
+ " NOT NULL" + ", SELECTOR " + stringIdDataType + ", LAST_ACKED_ID " + sequenceDataType
+ ", PRIMARY KEY ( CONTAINER, CLIENT_ID, SUB_NAME))",
@@ -165,14 +166,14 @@
public String getCreateDurableSubStatement() {
if (createDurableSubStatement == null) {
createDurableSubStatement = "INSERT INTO " + getFullAckTableName()
- + "(CONTAINER, CLIENT_ID, SUB_NAME, SELECTOR, LAST_ACKED_ID) " + "VALUES (?, ?, ?, ?, ?)";
+ + "(CONTAINER, CLIENT_ID, SUB_NAME, SELECTOR, LAST_ACKED_ID, SUB_DEST) " + "VALUES (?, ?, ?, ?, ?, ?)";
}
return createDurableSubStatement;
}
public String getFindDurableSubStatement() {
if (findDurableSubStatement == null) {
- findDurableSubStatement = "SELECT SELECTOR, SUB_NAME " + "FROM " + getFullAckTableName()
+ findDurableSubStatement = "SELECT SELECTOR, SUB_DEST " + "FROM " + getFullAckTableName()
+ " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
}
return findDurableSubStatement;
@@ -180,7 +181,7 @@
public String getFindAllDurableSubsStatement() {
if (findAllDurableSubsStatement == null) {
- findAllDurableSubsStatement = "SELECT SELECTOR, SUB_NAME, CLIENT_ID" + " FROM " + getFullAckTableName()
+ findAllDurableSubsStatement = "SELECT SELECTOR, SUB_NAME, CLIENT_ID, SUB_DEST" + " FROM " + getFullAckTableName()
+ " WHERE CONTAINER=?";
}
return findAllDurableSubsStatement;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/AxionJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/AxionJDBCAdapter.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/AxionJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/AxionJDBCAdapter.java Mon Aug 6 10:07:55 2007
@@ -48,6 +48,7 @@
"CREATE INDEX "+statements.getFullMessageTableName()+"_EIDX ON "+statements.getFullMessageTableName()+" (EXPIRATION)",
"CREATE TABLE "+statements.getFullAckTableName()+"("
+"CONTAINER "+statements.getContainerNameDataType()+" NOT NULL"
+ +", SUB_DEST " + statements.getContainerNameDataType()
+", CLIENT_ID "+statements.getStringIdDataType()+" NOT NULL"
+", SUB_NAME "+statements.getStringIdDataType()+" NOT NULL"
+", SELECTOR "+statements.getStringIdDataType()
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Mon Aug 6 10:07:55 2007
@@ -431,8 +431,7 @@
* @see org.apache.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection, java.lang.Object,
* org.apache.activemq.service.SubscriptionInfo)
*/
- public void doSetSubscriberEntry(TransactionContext c,ActiveMQDestination destination,String clientId,
- String subscriptionName,String selector,boolean retroactive) throws SQLException,IOException{
+ public void doSetSubscriberEntry(TransactionContext c,SubscriptionInfo info,boolean retroactive) throws SQLException,IOException{
// dumpTables(c, destination.getQualifiedName(), clientId, subscriptionName);
PreparedStatement s=null;
try{
@@ -451,13 +450,14 @@
}
}
s=c.getConnection().prepareStatement(statements.getCreateDurableSubStatement());
- s.setString(1,destination.getQualifiedName());
- s.setString(2,clientId);
- s.setString(3,subscriptionName);
- s.setString(4,selector);
+ s.setString(1,info.getDestination().getQualifiedName());
+ s.setString(2,info.getClientId());
+ s.setString(3,info.getSubscriptionName());
+ s.setString(4,info.getSelector());
s.setLong(5,lastMessageId);
+ s.setString(6, info.getSubscribedDestination().getQualifiedName());
if(s.executeUpdate()!=1){
- throw new IOException("Could not create durable subscription for: "+clientId);
+ throw new IOException("Could not create durable subscription for: "+info.getClientId());
}
}finally{
close(s);
@@ -480,8 +480,9 @@
SubscriptionInfo subscription=new SubscriptionInfo();
subscription.setDestination(destination);
subscription.setClientId(clientId);
- subscription.setSubcriptionName(subscriptionName);
+ subscription.setSubscriptionName(subscriptionName);
subscription.setSelector(rs.getString(1));
+ subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(2), ActiveMQDestination.QUEUE_TYPE));
return subscription;
}finally{
close(rs);
@@ -502,8 +503,9 @@
SubscriptionInfo subscription=new SubscriptionInfo();
subscription.setDestination(destination);
subscription.setSelector(rs.getString(1));
- subscription.setSubcriptionName(rs.getString(2));
+ subscription.setSubscriptionName(rs.getString(2));
subscription.setClientId(rs.getString(3));
+ subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(4),ActiveMQDestination.QUEUE_TYPE));
rc.add(subscription);
}
return (SubscriptionInfo[])rc.toArray(new SubscriptionInfo[rc.size()]);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java Mon Aug 6 10:07:55 2007
@@ -68,9 +68,9 @@
return longTermStore.lookupSubscription(clientId, subscriptionName);
}
- public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
+ public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
this.peristenceAdapter.checkpoint(true, true);
- longTermStore.addSubsciption(clientId, subscriptionName, selector, retroactive);
+ longTermStore.addSubsciption(subscriptionInfo, retroactive);
}
public void addMessage(ConnectionContext context, Message message) throws IOException {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java Mon Aug 6 10:07:55 2007
@@ -27,6 +27,7 @@
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.CommandMarshaller;
+import org.apache.activemq.kaha.ContainerId;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Marshaller;
@@ -71,7 +72,8 @@
try{
Store store=getStore();
for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){
- Object obj=i.next();
+ ContainerId id=(ContainerId) i.next();
+ Object obj = id.getKey();
if(obj instanceof ActiveMQDestination){
rc.add((ActiveMQDestination)obj);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Mon Aug 6 10:07:55 2007
@@ -211,7 +211,7 @@
for(Iterator i=durableSubscribers.iterator();i.hasNext();){
SubscriptionInfo info=(SubscriptionInfo)i.next();
TopicReferenceStore ts=createTopicReferenceStore((ActiveMQTopic)info.getDestination());
- ts.addSubsciption(info.getClientId(),info.getSubscriptionName(),info.getSelector(),false);
+ ts.addSubsciption(info,false);
}
}
@@ -247,6 +247,20 @@
this.stateStore=createStateStore(getDirectory());
}
return this.stateStore;
+ }
+
+ public void deleteAllMessages() throws IOException{
+ super.deleteAllMessages();
+ if(stateStore!=null){
+ if(stateStore.isInitialized()){
+ stateStore.clear();
+ }else{
+ stateStore.delete();
+ }
+ }else{
+ File stateDirectory=new File(getDirectory(),"kr-state");
+ StoreFactory.delete(stateDirectory.getAbsolutePath());
+ }
}
private Store createStateStore(File directory){
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java Mon Aug 6 10:07:55 2007
@@ -106,14 +106,9 @@
return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
}
- public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
+ public synchronized void addSubsciption(SubscriptionInfo info,boolean retroactive)
throws IOException{
- SubscriptionInfo info=new SubscriptionInfo();
- info.setDestination(destination);
- info.setClientId(clientId);
- info.setSelector(selector);
- info.setSubcriptionName(subscriptionName);
- String key=getSubscriptionKey(clientId,subscriptionName);
+ String key=getSubscriptionKey(info.getClientId(),info.getSubscriptionName());
// if already exists - won't add it again as it causes data files
// to hang around
if(!subscriberContainer.containsKey(key)){
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java Mon Aug 6 10:07:55 2007
@@ -18,6 +18,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@@ -46,7 +47,7 @@
subscriberContainer=subsContainer;
// load all the Ack containers
for(Iterator i=subscriberContainer.keySet().iterator();i.hasNext();){
- Object key=i.next();
+ String key=(String) i.next();
addSubscriberMessageContainer(key);
}
}
@@ -102,8 +103,8 @@
}
}
- protected ListContainer addSubscriberMessageContainer(Object key) throws IOException{
- ListContainer container=store.getListContainer(key,"topic-subs-references");
+ protected ListContainer addSubscriberMessageContainer(String key) throws IOException{
+ ListContainer container=store.getListContainer(destination,"topic-subs-references-"+key);
Marshaller marshaller=new ConsumerMessageRefMarshaller();
container.setMarshaller(marshaller);
TopicSubContainer tsc=new TopicSubContainer(container);
@@ -141,14 +142,9 @@
}
}
- public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
+ public synchronized void addSubsciption(SubscriptionInfo info,boolean retroactive)
throws IOException{
- SubscriptionInfo info=new SubscriptionInfo();
- info.setDestination(destination);
- info.setClientId(clientId);
- info.setSelector(selector);
- info.setSubcriptionName(subscriptionName);
- String key=getSubscriptionKey(clientId,subscriptionName);
+ String key=getSubscriptionKey(info.getClientId(), info.getSubscriptionName());
// if already exists - won't add it again as it causes data files
// to hang around
if(!subscriberContainer.containsKey(key)){
@@ -253,7 +249,7 @@
}
}
- protected void removeSubscriberMessageContainer(Object key) throws IOException{
+ protected void removeSubscriberMessageContainer(String key) throws IOException{
subscriberContainer.remove(key);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.remove(key);
for(Iterator i=container.iterator();i.hasNext();){
@@ -270,7 +266,7 @@
}
}
}
- store.deleteListContainer(key,"topic-subs-references");
+ store.deleteListContainer(destination,"topic-subs-references-"+key);
}
protected String getSubscriptionKey(String clientId,String subscriberName){
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java Mon Aug 6 10:07:55 2007
@@ -73,14 +73,9 @@
return (SubscriptionInfo)subscriberDatabase.get(new SubscriptionKey(clientId,subscriptionName));
}
- public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
+ public synchronized void addSubsciption(SubscriptionInfo info,boolean retroactive)
throws IOException{
- SubscriptionInfo info=new SubscriptionInfo();
- info.setDestination(destination);
- info.setClientId(clientId);
- info.setSelector(selector);
- info.setSubcriptionName(subscriptionName);
- SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
+ SubscriptionKey key=new SubscriptionKey(info);
MemoryTopicSub sub=new MemoryTopicSub();
topicSubMap.put(key,sub);
if(retroactive){
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java Mon Aug 6 10:07:55 2007
@@ -17,6 +17,8 @@
*/
package org.apache.activemq.broker;
+import java.util.ArrayList;
+
import javax.jms.DeliveryMode;
import junit.framework.Test;
@@ -29,6 +31,7 @@
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
@@ -39,6 +42,113 @@
*/
public class RecoveryBrokerTest extends BrokerRestartTestSupport {
+ /**
+ * Used to verify that after a broker restart durable subscriptions that
+ * use wild cards are still wild card subscription after broker restart.
+ *
+ * @throws Exception
+ */
+ public void testWildCardSubscriptionPreservedOnRestart() throws Exception {
+ ActiveMQDestination dest1 = new ActiveMQTopic("TEST.A");
+ ActiveMQDestination dest2 = new ActiveMQTopic("TEST.B");
+ ActiveMQDestination dest3 = new ActiveMQTopic("TEST.C");
+ ActiveMQDestination wildDest = new ActiveMQTopic("TEST.>");
+
+ ArrayList<MessageId> sentBeforeRestart = new ArrayList<MessageId>();
+ ArrayList<MessageId> sentBeforeCreateConsumer = new ArrayList<MessageId>();
+ ArrayList<MessageId> sentAfterCreateConsumer = new ArrayList<MessageId>();
+
+ // Setup a first connection
+ {
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ connectionInfo1.setClientId("A");
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo1);
+
+ // Create the durable subscription.
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, wildDest);
+ consumerInfo1.setSubscriptionName("test");
+ consumerInfo1.setPrefetchSize(100);
+ connection1.send(consumerInfo1);
+
+ // Close the subscription.
+ connection1.send(closeConsumerInfo(consumerInfo1));
+
+ // Send the messages
+ for( int i=0; i < 4; i++) {
+ Message m = createMessage(producerInfo1, dest1, DeliveryMode.PERSISTENT);
+ connection1.send(m);
+ sentBeforeRestart.add(m.getMessageId());
+ }
+ connection1.request(closeConnectionInfo(connectionInfo1));
+ connection1.stop();
+ }
+
+ // Restart the broker.
+ restartBroker();
+
+ // Get a connection to the new broker.
+ {
+ StubConnection connection2 = createConnection();
+ ConnectionInfo connectionInfo2 = createConnectionInfo();
+ connectionInfo2.setClientId("A");
+ SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+ connection2.send(connectionInfo2);
+ connection2.send(sessionInfo2);
+
+ ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
+ connection2.send(producerInfo2);
+
+ // Send messages before the durable subscription is re-activated.
+ for( int i=0; i < 4; i++) {
+ Message m = createMessage(producerInfo2, dest2, DeliveryMode.PERSISTENT);
+ connection2.send(m);
+ sentBeforeCreateConsumer.add(m.getMessageId());
+ }
+
+ // Re-open the subscription.
+ ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, wildDest);
+ consumerInfo2.setSubscriptionName("test");
+ consumerInfo2.setPrefetchSize(100);
+ connection2.send(consumerInfo2);
+
+ // Send messages after the subscription is activated.
+ for( int i=0; i < 4; i++) {
+ Message m = createMessage(producerInfo2, dest3, DeliveryMode.PERSISTENT);
+ connection2.send(m);
+ sentAfterCreateConsumer.add(m.getMessageId());
+ }
+
+ // We should get the recovered messages...
+ for( int i=0; i < 4 ; i++ ) {
+ Message m2 = receiveMessage(connection2);
+ assertNotNull("Recovered message missing: "+i, m2);
+ assertEquals(sentBeforeRestart.get(i), m2.getMessageId());
+ }
+
+ // We should get get the messages that were sent before the sub was reactivated.
+ for( int i=0; i < 4 ; i++ ) {
+ Message m2 = receiveMessage(connection2);
+ assertNotNull("Before activated message missing: "+i, m2);
+ assertEquals(sentBeforeCreateConsumer.get(i), m2.getMessageId());
+ }
+
+ // We should get get the messages that were sent after the sub was reactivated.
+ for( int i=0; i < 4 ; i++ ) {
+ Message m2 = receiveMessage(connection2);
+ assertNotNull("After activated message missing: "+i, m2);
+ assertEquals(""+i, sentAfterCreateConsumer.get(i), m2.getMessageId());
+ }
+
+ assertNoMessagesLeft(connection2);
+ }
+
+ }
+
public void testConsumedQueuePersistentMessagesLostOnRestart() throws Exception {
ActiveMQDestination destination = new ActiveMQQueue("TEST");
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java Mon Aug 6 10:07:55 2007
@@ -39,9 +39,9 @@
broker.start();
slave = new BrokerService();
slave.setBrokerName("slave");
- slave.addConnector("tcp://localhost:62002");
slave.setDeleteAllMessagesOnStartup(true);
slave.setMasterConnectorURI("tcp://localhost:62001");
+ slave.addConnector("tcp://localhost:62002");
slave.start();
// wait for thing to connect
Thread.sleep(1000);
@@ -62,8 +62,8 @@
protected BrokerService createBroker() throws Exception,URISyntaxException{
BrokerService broker=new BrokerService();
broker.setBrokerName("master");
- broker.addConnector("tcp://localhost:62001");
broker.setDeleteAllMessagesOnStartup(true);
+ broker.addConnector("tcp://localhost:62001");
return broker;
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/SubscriptionInfoTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/SubscriptionInfoTest.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/SubscriptionInfoTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/SubscriptionInfoTest.java Mon Aug 6 10:07:55 2007
@@ -55,5 +55,6 @@
info.setDestination(createActiveMQDestination("Destination:2"));
info.setSelector("Selector:3");
info.setSubcriptionName("SubcriptionName:4");
+ info.setSubscribedDestination(createActiveMQDestination("SubscribedDestination:5"));
}
}
Modified: activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java (original)
+++ activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java Mon Aug 6 10:07:55 2007
@@ -59,14 +59,15 @@
adapter.commitEntityManager(context,manager);
}
- public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
+ public void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
EntityManager manager = adapter.beginEntityManager(null);
try {
StoredSubscription ss = new StoredSubscription();
- ss.setClientId(clientId);
- ss.setSubscriptionName(subscriptionName);
+ ss.setClientId(info.getClientId());
+ ss.setSubscriptionName(info.getSubscriptionName());
ss.setDestination(destinationName);
- ss.setSelector(selector);
+ ss.setSelector(info.getSelector());
+ ss.setSubscribedDestination(info.getSubscribedDestination().getQualifiedName());
ss.setLastAckedId(-1);
if( !retroactive ) {
@@ -125,7 +126,8 @@
info.setClientId(ss.getClientId());
info.setDestination(destination);
info.setSelector(ss.getSelector());
- info.setSubcriptionName(ss.getSubscriptionName());
+ info.setSubscriptionName(ss.getSubscriptionName());
+ info.setSubscribedDestination(toSubscribedDestination(ss));
l.add(info);
}
@@ -171,7 +173,8 @@
rc.setClientId(ss.getClientId());
rc.setDestination(destination);
rc.setSelector(ss.getSelector());
- rc.setSubcriptionName(ss.getSubscriptionName());
+ rc.setSubscriptionName(ss.getSubscriptionName());
+ rc.setSubscribedDestination(toSubscribedDestination(ss));
}
} catch (Throwable e) {
adapter.rollbackEntityManager(null,manager);
@@ -179,6 +182,12 @@
}
adapter.commitEntityManager(null,manager);
return rc;
+ }
+
+ private ActiveMQDestination toSubscribedDestination(StoredSubscription ss) {
+ if( ss.getSubscribedDestination() == null )
+ return null;
+ return ActiveMQDestination.createDestination(ss.getSubscribedDestination(), ActiveMQDestination.QUEUE_TYPE);
}
public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
Modified: activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicReferenceStore.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicReferenceStore.java (original)
+++ activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicReferenceStore.java Mon Aug 6 10:07:55 2007
@@ -57,14 +57,15 @@
adapter.commitEntityManager(context,manager);
}
- public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
+ public void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
EntityManager manager = adapter.beginEntityManager(null);
try {
StoredSubscription ss = new StoredSubscription();
- ss.setClientId(clientId);
- ss.setSubscriptionName(subscriptionName);
+ ss.setClientId(info.getClientId());
+ ss.setSubscriptionName(info.getSubcriptionName());
ss.setDestination(destinationName);
- ss.setSelector(selector);
+ ss.setSelector(info.getSelector());
+ ss.setSubscribedDestination(info.getSubscribedDestination().getQualifiedName());
ss.setLastAckedId(-1);
if( !retroactive ) {
@@ -123,7 +124,8 @@
info.setClientId(ss.getClientId());
info.setDestination(destination);
info.setSelector(ss.getSelector());
- info.setSubcriptionName(ss.getSubscriptionName());
+ info.setSubscriptionName(ss.getSubscriptionName());
+ info.setSubscribedDestination(toSubscribedDestination(ss));
l.add(info);
}
@@ -136,6 +138,12 @@
adapter.commitEntityManager(null,manager);
return rc;
}
+
+ private ActiveMQDestination toSubscribedDestination(StoredSubscription ss) {
+ if( ss.getSubscribedDestination() == null )
+ return null;
+ return ActiveMQDestination.createDestination(ss.getSubscribedDestination(), ActiveMQDestination.QUEUE_TYPE);
+ }
public int getMessageCount(String clientId, String subscriptionName) throws IOException {
Long rc;
@@ -169,7 +177,8 @@
rc.setClientId(ss.getClientId());
rc.setDestination(destination);
rc.setSelector(ss.getSelector());
- rc.setSubcriptionName(ss.getSubscriptionName());
+ rc.setSubscriptionName(ss.getSubscriptionName());
+ rc.setSubscribedDestination(toSubscribedDestination(ss));
}
} catch (Throwable e) {
adapter.rollbackEntityManager(null,manager);
Modified: activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java?view=diff&rev=563194&r1=563193&r2=563194
==============================================================================
--- activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java (original)
+++ activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java Mon Aug 6 10:07:55 2007
@@ -106,7 +106,8 @@
private long lastAckedId;
@Basic
private String selector;
-
+ @Basic
+ private String subscribedDestination;
public long getLastAckedId() {
return lastAckedId;
@@ -154,5 +155,13 @@
public void setId(long id) {
this.id = id;
+ }
+
+ public String getSubscribedDestination() {
+ return subscribedDestination;
+ }
+
+ public void setSubscribedDestination(String subscribedDestination) {
+ this.subscribedDestination = subscribedDestination;
}
}