You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/03/29 00:10:36 UTC
svn commit: r389614 - in
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker:
BrokerService.java region/DurableTopicSubscription.java
region/RegionBroker.java region/Topic.java region/TopicRegion.java
Author: chirino
Date: Tue Mar 28 14:10:34 2006
New Revision: 389614
URL: http://svn.apache.org/viewcvs?rev=389614&view=rev
Log:
Working on https://issues.apache.org/activemq/browse/AMQ-669
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=389614&r1=389613&r2=389614&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Tue Mar 28 14:10:34 2006
@@ -116,6 +116,8 @@
private AtomicBoolean started = new AtomicBoolean(false);
private BrokerPlugin[] plugins;
+ private boolean keepDurableSubsActive;
+
/**
* Adds a new transport connector for the given bind address
*
@@ -908,6 +910,7 @@
else {
regionBroker = new RegionBroker(this,getTaskRunnerFactory(), getMemoryManager(), getPersistenceAdapter());
}
+ regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
regionBroker.setBrokerName(getBrokerName());
return regionBroker;
}
@@ -1120,5 +1123,13 @@
*/
public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure){
this.shutdownOnMasterFailure=shutdownOnMasterFailure;
+ }
+
+ public boolean isKeepDurableSubsActive() {
+ return keepDurableSubsActive;
+ }
+
+ public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
+ this.keepDurableSubsActive = keepDurableSubsActive;
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=389614&r1=389613&r2=389614&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Tue Mar 28 14:10:34 2006
@@ -36,10 +36,12 @@
private final ConcurrentHashMap redeliveredMessages = new ConcurrentHashMap();
private final ConcurrentHashMap destinations = new ConcurrentHashMap();
private final SubscriptionKey subscriptionKey;
+ private final boolean keepDurableSubsActive;
private boolean active=false;
- public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
+ public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) throws InvalidSelectorException {
super(broker,context, info);
+ this.keepDurableSubsActive = keepDurableSubsActive;
subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
}
@@ -57,10 +59,13 @@
synchronized public void add(ConnectionContext context, Destination destination) throws Exception {
super.add(context, destination);
destinations.put(destination.getActiveMQDestination(), destination);
- if( active ) {
+ if( active || keepDurableSubsActive ) {
Topic topic = (Topic) destination;
topic.activate(context, this);
}
+ if( !isFull() ) {
+ dispatchMatched();
+ }
}
synchronized public void activate(ConnectionContext context, ConsumerInfo info) throws Exception {
@@ -68,21 +73,25 @@
this.active = true;
this.context = context;
this.info = info;
- for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
- Topic topic = (Topic) iter.next();
- topic.activate(context, this);
+ if( !keepDurableSubsActive ) {
+ for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
+ Topic topic = (Topic) iter.next();
+ topic.activate(context, this);
+ }
}
- if( !isFull() ) {
+ if( !isFull() ) {
dispatchMatched();
}
}
}
- synchronized public void deactivate() throws Exception {
+ synchronized public void deactivate(boolean keepDurableSubsActive) throws Exception {
active=false;
- for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
- Topic topic = (Topic) iter.next();
- topic.deactivate(context, this);
+ if( !keepDurableSubsActive ) {
+ for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
+ Topic topic = (Topic) iter.next();
+ topic.deactivate(context, this);
+ }
}
for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
@@ -115,7 +124,7 @@
}
synchronized public void add(MessageReference node) throws Exception {
- if( !active ) {
+ if( !active && !keepDurableSubsActive ) {
return;
}
node = new IndirectMessageReference(node.getRegionDestination(), (Message) node);
@@ -123,14 +132,14 @@
node.decrementReferenceCount();
}
- public int getPendingQueueSize(){
- if (active){
+ public int getPendingQueueSize() {
+ if( active || keepDurableSubsActive ) {
return super.getPendingQueueSize();
}
//TODO: need to get from store
return 0;
}
-
+
public void setSelector(String selector) throws InvalidSelectorException {
throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions");
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=389614&r1=389613&r2=389614&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Tue Mar 28 14:10:34 2006
@@ -71,6 +71,7 @@
private final Region tempTopicRegion;
private BrokerService brokerService;
private boolean stopped = false;
+ private boolean keepDurableSubsActive=false;
protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
@@ -125,6 +126,7 @@
public void start() throws Exception {
+ ((TopicRegion)topicRegion).setKeepDurableSubsActive(keepDurableSubsActive);
queueRegion.start();
topicRegion.start();
tempQueueRegion.start();
@@ -477,6 +479,14 @@
ss.stop(topicRegion);
ss.stop(tempQueueRegion);
ss.stop(tempTopicRegion);
+ }
+
+ public boolean isKeepDurableSubsActive() {
+ return keepDurableSubsActive;
+ }
+
+ public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
+ this.keepDurableSubsActive = keepDurableSubsActive;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=389614&r1=389613&r2=389614&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Tue Mar 28 14:10:34 2006
@@ -127,13 +127,7 @@
}
sub.remove(context, this);
}
-
- public void addInactiveSubscription(ConnectionContext context, DurableTopicSubscription sub) throws Exception {
- sub.add(context, this);
- destinationStatistics.getConsumers().increment();
- durableSubcribers.put(sub.getSubscriptionKey(), sub);
- }
-
+
public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws IOException {
if (store != null) {
store.deleteSubscription(key.clientId, key.subscriptionName);
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=389614&r1=389613&r2=389614&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Tue Mar 28 14:10:34 2006
@@ -16,30 +16,32 @@
*/
package org.apache.activemq.broker.region;
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import java.util.Iterator;
+import java.util.Set;
+
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
-import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import javax.jms.InvalidDestinationException;
-import javax.jms.JMSException;
-
-import java.util.Iterator;
-import java.util.Set;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
/**
*
@@ -47,9 +49,10 @@
*/
public class TopicRegion extends AbstractRegion {
private static final Log log = LogFactory.getLog(TopicRegion.class);
-
protected final ConcurrentHashMap durableSubscriptions = new ConcurrentHashMap();
-
+ private final LongSequenceGenerator recoveredDurableSubIdGenerator = new LongSequenceGenerator();
+ private final SessionId recoveredDurableSubSessionId = new SessionId(new ConnectionId("OFFLINE"), recoveredDurableSubIdGenerator.getNextSequenceId());
+ private boolean keepDurableSubsActive=false;
public TopicRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
PersistenceAdapter persistenceAdapter) {
@@ -116,7 +119,7 @@
SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
if (sub != null) {
- sub.deactivate();
+ sub.deactivate(keepDurableSubsActive);
}
}
@@ -166,24 +169,32 @@
// A single durable sub may be subscribing to multiple topics. so it might exist already.
DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
- if( sub == null ) {
- sub = (DurableTopicSubscription) createSubscription(context, createInactiveConsumerInfo(info));
+ ConsumerInfo consumerInfo = createInactiveConsumerInfo(info);
+ if( sub == null ) {
+ sub = (DurableTopicSubscription) createSubscription(context, consumerInfo );
}
- topic.addInactiveSubscription(context, sub);
+
+ subscriptions.put(consumerInfo.getConsumerId(), sub);
+ topic.addSubscription(context, sub);
}
}
return topic;
}
- private static ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) {
+ private ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) {
ConsumerInfo rc = new ConsumerInfo();
rc.setSelector(info.getSelector());
rc.setSubcriptionName(info.getSubcriptionName());
rc.setDestination(info.getDestination());
+ rc.setConsumerId(createConsumerId());
return rc;
}
+ private ConsumerId createConsumerId() {
+ return new ConsumerId(recoveredDurableSubSessionId,recoveredDurableSubIdGenerator.getNextSequenceId());
+ }
+
protected void configureTopic(Topic topic, ActiveMQDestination destination) {
if (broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
@@ -198,7 +209,7 @@
SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
if (sub == null) {
- sub = new DurableTopicSubscription(broker,context, info);
+ sub = new DurableTopicSubscription(broker,context, info, keepDurableSubsActive);
durableSubscriptions.put(key, sub);
}
else {
@@ -239,6 +250,14 @@
iter.remove();
}
return inactiveDestinations;
+ }
+
+ public boolean isKeepDurableSubsActive() {
+ return keepDurableSubsActive;
+ }
+
+ public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
+ this.keepDurableSubsActive = keepDurableSubsActive;
}
}