You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/03/29 00:10:36 UTC

svn commit: r389614 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker: BrokerService.java region/DurableTopicSubscription.java region/RegionBroker.java region/Topic.java region/TopicRegion.java

Author: chirino
Date: Tue Mar 28 14:10:34 2006
New Revision: 389614

URL: http://svn.apache.org/viewcvs?rev=389614&view=rev
Log:
Working on https://issues.apache.org/activemq/browse/AMQ-669

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=389614&r1=389613&r2=389614&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Tue Mar 28 14:10:34 2006
@@ -116,6 +116,8 @@
     private AtomicBoolean started = new AtomicBoolean(false);
     private BrokerPlugin[] plugins;
 
+    private boolean keepDurableSubsActive;
+
     /**
      * Adds a new transport connector for the given bind address
      *
@@ -908,6 +910,7 @@
         else {
 			regionBroker = new RegionBroker(this,getTaskRunnerFactory(), getMemoryManager(), getPersistenceAdapter());
         }
+        regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
 		regionBroker.setBrokerName(getBrokerName());
 		return regionBroker;
 	}
@@ -1120,5 +1123,13 @@
      */
     public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure){
         this.shutdownOnMasterFailure=shutdownOnMasterFailure;
+    }
+
+    public boolean isKeepDurableSubsActive() {
+        return keepDurableSubsActive;
+    }
+
+    public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
+        this.keepDurableSubsActive = keepDurableSubsActive;
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=389614&r1=389613&r2=389614&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Tue Mar 28 14:10:34 2006
@@ -36,10 +36,12 @@
     private final ConcurrentHashMap redeliveredMessages = new ConcurrentHashMap();
     private final ConcurrentHashMap destinations = new ConcurrentHashMap();
     private final SubscriptionKey subscriptionKey;
+    private final boolean keepDurableSubsActive;
     private boolean active=false;
     
-    public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
+    public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) throws InvalidSelectorException {
         super(broker,context, info);
+        this.keepDurableSubsActive = keepDurableSubsActive;
         subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
     }
     
@@ -57,10 +59,13 @@
     synchronized public void add(ConnectionContext context, Destination destination) throws Exception {
         super.add(context, destination);
         destinations.put(destination.getActiveMQDestination(), destination);
-        if( active ) {
+        if( active || keepDurableSubsActive ) {
             Topic topic = (Topic) destination;            
             topic.activate(context, this);
         }
+        if( !isFull() ) {
+            dispatchMatched();
+        }
     }
    
     synchronized public void activate(ConnectionContext context, ConsumerInfo info) throws Exception {
@@ -68,21 +73,25 @@
             this.active = true;
             this.context = context;
             this.info = info;
-            for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
-                Topic topic = (Topic) iter.next();
-                topic.activate(context, this);
+            if( !keepDurableSubsActive ) {
+                for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
+                    Topic topic = (Topic) iter.next();
+                    topic.activate(context, this);
+                }
             }
-            if( !isFull() ) {                            
+            if( !isFull() ) {
                 dispatchMatched();
             }
         }
     }
 
-    synchronized public void deactivate() throws Exception {        
+    synchronized public void deactivate(boolean keepDurableSubsActive) throws Exception {        
         active=false;
-        for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
-            Topic topic = (Topic) iter.next();
-            topic.deactivate(context, this);
+        if( !keepDurableSubsActive ) {
+            for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
+                Topic topic = (Topic) iter.next();
+                topic.deactivate(context, this);
+            }
         }
         for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
 
@@ -115,7 +124,7 @@
     }
 
     synchronized public void add(MessageReference node) throws Exception {
-        if( !active ) {
+        if( !active && !keepDurableSubsActive ) {
             return;
         }
         node = new IndirectMessageReference(node.getRegionDestination(), (Message) node);
@@ -123,14 +132,14 @@
         node.decrementReferenceCount();
     }
     
-    public int getPendingQueueSize(){
-        if (active){
+    public int getPendingQueueSize() {
+        if( active || keepDurableSubsActive ) {
             return super.getPendingQueueSize();
         }
         //TODO: need to get from store
         return 0;
     }
-    
+   
     public void setSelector(String selector) throws InvalidSelectorException {
         throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions");
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=389614&r1=389613&r2=389614&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Tue Mar 28 14:10:34 2006
@@ -71,6 +71,7 @@
     private final Region tempTopicRegion;
     private BrokerService brokerService;
     private boolean stopped = false;
+    private boolean keepDurableSubsActive=false;
     
     protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
     
@@ -125,6 +126,7 @@
     
     
     public void start() throws Exception {
+        ((TopicRegion)topicRegion).setKeepDurableSubsActive(keepDurableSubsActive);
         queueRegion.start();
         topicRegion.start();
         tempQueueRegion.start();
@@ -477,6 +479,14 @@
         ss.stop(topicRegion);
         ss.stop(tempQueueRegion);
         ss.stop(tempTopicRegion);
+    }
+
+    public boolean isKeepDurableSubsActive() {
+        return keepDurableSubsActive;
+    }
+
+    public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
+        this.keepDurableSubsActive = keepDurableSubsActive;
     }
 
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=389614&r1=389613&r2=389614&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Tue Mar 28 14:10:34 2006
@@ -127,13 +127,7 @@
         }
         sub.remove(context, this);
     }
-    
-    public void addInactiveSubscription(ConnectionContext context, DurableTopicSubscription sub) throws Exception {
-        sub.add(context, this);        
-        destinationStatistics.getConsumers().increment();
-        durableSubcribers.put(sub.getSubscriptionKey(), sub);
-    }
-   
+       
     public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws IOException {
         if (store != null) {
             store.deleteSubscription(key.clientId, key.subscriptionName);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=389614&r1=389613&r2=389614&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Tue Mar 28 14:10:34 2006
@@ -16,30 +16,32 @@
  */
 package org.apache.activemq.broker.region;
 
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import java.util.Iterator;
+import java.util.Set;
+
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
 
-import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.SessionId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.util.LongSequenceGenerator;
 import org.apache.activemq.util.SubscriptionKey;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import javax.jms.InvalidDestinationException;
-import javax.jms.JMSException;
-
-import java.util.Iterator;
-import java.util.Set;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 
 /**
  * 
@@ -47,9 +49,10 @@
  */
 public class TopicRegion extends AbstractRegion {
     private static final Log log = LogFactory.getLog(TopicRegion.class);
-    
     protected final ConcurrentHashMap durableSubscriptions = new ConcurrentHashMap();
-   
+    private final LongSequenceGenerator recoveredDurableSubIdGenerator = new LongSequenceGenerator();
+    private final SessionId recoveredDurableSubSessionId = new SessionId(new ConnectionId("OFFLINE"), recoveredDurableSubIdGenerator.getNextSequenceId()); 
+    private boolean keepDurableSubsActive=false;
 
     public TopicRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
             PersistenceAdapter persistenceAdapter) {
@@ -116,7 +119,7 @@
             SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
             DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
             if (sub != null) {
-                sub.deactivate();
+                sub.deactivate(keepDurableSubsActive);
             }
 
         }
@@ -166,24 +169,32 @@
                 
                 // A single durable sub may be subscribing to multiple topics.  so it might exist already.
                 DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
-                if( sub == null ) {
-                    sub = (DurableTopicSubscription) createSubscription(context, createInactiveConsumerInfo(info));
+                ConsumerInfo consumerInfo = createInactiveConsumerInfo(info);
+                if( sub == null ) { 
+                    sub = (DurableTopicSubscription) createSubscription(context, consumerInfo );
                 }
-                topic.addInactiveSubscription(context, sub);
+                
+                subscriptions.put(consumerInfo.getConsumerId(), sub);
+                topic.addSubscription(context, sub);
             }            
         }
         
         return topic;
     }
     
-    private static ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) {
+    private ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) {
         ConsumerInfo rc = new ConsumerInfo();
         rc.setSelector(info.getSelector());
         rc.setSubcriptionName(info.getSubcriptionName());
         rc.setDestination(info.getDestination());
+        rc.setConsumerId(createConsumerId());
         return rc;
     }
 
+    private ConsumerId createConsumerId() {
+        return new ConsumerId(recoveredDurableSubSessionId,recoveredDurableSubIdGenerator.getNextSequenceId());
+    }
+
     protected void configureTopic(Topic topic, ActiveMQDestination destination) {
         if (broker.getDestinationPolicy() != null) {
             PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
@@ -198,7 +209,7 @@
             SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
             DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
             if (sub == null) {
-                sub = new DurableTopicSubscription(broker,context, info);
+                sub = new DurableTopicSubscription(broker,context, info, keepDurableSubsActive);
                 durableSubscriptions.put(key, sub);
             }
             else {
@@ -239,6 +250,14 @@
                 iter.remove();
         }
         return inactiveDestinations;
+    }
+
+    public boolean isKeepDurableSubsActive() {
+        return keepDurableSubsActive;
+    }
+
+    public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
+        this.keepDurableSubsActive = keepDurableSubsActive;
     }
 
 }