You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/12/28 22:03:54 UTC

svn commit: r490814 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/

Author: rajdavies
Date: Thu Dec 28 13:03:53 2006
New Revision: 490814

URL: http://svn.apache.org/viewvc?view=rev&rev=490814
Log:
Use the store based cursor by default for Queues - which will enable very large queue support

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?view=diff&rev=490814&r1=490813&r2=490814
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Thu Dec 28 13:03:53 2006
@@ -38,6 +38,7 @@
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import sun.security.x509.IssuerAlternativeNameExtension;
 
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -60,6 +61,7 @@
     protected final TaskRunnerFactory taskRunnerFactory;
     protected final Object destinationsMutex = new Object();
     protected final Map consumerChangeMutexMap = new HashMap();
+    protected boolean started = false;
 
     public AbstractRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
         if (broker == null) {
@@ -76,9 +78,15 @@
     }
 
     public void start() throws Exception {
+        started = true;
+        for (Iterator i = destinations.values().iterator();i.hasNext();) {
+            Destination dest = (Destination)i.next();
+            dest.start();
+        }
     }
     
     public void stop() throws Exception {
+        started = false;
         for (Iterator i = destinations.values().iterator();i.hasNext();) {
             Destination dest = (Destination)i.next();
             dest.stop();
@@ -102,7 +110,7 @@
                 if (destinationInterceptor != null) {
                     dest = destinationInterceptor.intercept(dest);
                 }
-
+                
                 dest.start();
 
                 destinations.put(destination, dest);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java?view=diff&rev=490814&r1=490813&r2=490814
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java Thu Dec 28 13:03:53 2006
@@ -77,7 +77,7 @@
         if (destination.isQueue()) {
             if (destination.isTemporary()) {
                 final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination;
-                return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory) {
+                return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory,broker.getTempDataStore()) {
                    
                     public void addSubscription(ConnectionContext context,Subscription sub) throws Exception {
                         // Only consumers on the same connection can consume from 
@@ -90,7 +90,7 @@
                 };
             } else {
                 MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue) destination);
-                Queue queue = new Queue(destination, memoryManager, store, destinationStatistics, taskRunnerFactory);
+                Queue queue = new Queue(destination, memoryManager, store, destinationStatistics, taskRunnerFactory,broker.getTempDataStore());
                 configureQueue(queue, destination);
                 queue.initialize();
                 return queue;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?view=diff&rev=490814&r1=490813&r2=490814
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Thu Dec 28 13:03:53 2006
@@ -19,22 +19,21 @@
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
 import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
-import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.util.SubscriptionKey;
-import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 public class DurableTopicSubscription extends PrefetchSubscription {
-    
+    static private final Log log=LogFactory.getLog(PrefetchSubscription.class);
     private final ConcurrentHashMap redeliveredMessages = new ConcurrentHashMap();
     private final ConcurrentHashMap destinations = new ConcurrentHashMap();
     private final SubscriptionKey subscriptionKey;
@@ -72,6 +71,7 @@
     }
    
     public void activate(ConnectionContext context, ConsumerInfo info) throws Exception {
+        log.debug("Deactivating " + this);
         if( !active ) {
             this.active = true;
             this.context = context;
@@ -96,7 +96,8 @@
         }
     }
 
-    synchronized public void deactivate(boolean keepDurableSubsActive) throws Exception {        
+    synchronized public void deactivate(boolean keepDurableSubsActive) throws Exception {   
+   
         active=false;
         synchronized(pending){
             pending.stop();
@@ -197,9 +198,12 @@
             "DurableTopicSubscription:" +
             " consumer="+info.getConsumerId()+
             ", destinations="+destinations.size()+
-            ", dispatched="+dispatched.size()+
-            ", delivered="+this.prefetchExtension+
-            ", pending="+getPendingQueueSize();
+            ", total="+enqueueCounter+
+            ", pending="+getPendingQueueSize()+
+            ", dispatched="+dispatchCounter+
+            ", inflight="+dispatched.size()+
+            ", prefetchExtension="+this.prefetchExtension;
+            
     }
 
     public String getClientId() {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=490814&r1=490813&r2=490814
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Thu Dec 28 13:03:53 2006
@@ -327,6 +327,10 @@
         return (dispatched.size()-prefetchExtension) >= (info.getPrefetchSize() *.9);
     }
     
+    public int countBeforeFull() {
+        return info.getPrefetchSize() + prefetchExtension - dispatched.size();
+    }
+    
     public int getPendingQueueSize(){
     	synchronized(pending) {
     		return pending.size();
@@ -396,28 +400,38 @@
                 List toDispatch=null;
                 synchronized(pending){
                     try{
-                        pending.reset();
-                        while(pending.hasNext()&&!isFull()){
-                            MessageReference node=pending.next();
-                            pending.remove();
-                            // Message may have been sitting in the pending list a while
-                            // waiting for the consumer to ak the message.
-                            if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){
-                                continue; // just drop it.
+                        int numberToDispatch=countBeforeFull();
+                        if(numberToDispatch>0){
+                            int count=0;
+                            pending.reset();
+                            while(pending.hasNext()&&!isFull()&&count<numberToDispatch){
+                                MessageReference node=pending.next();
+                               
+                                if(canDispatch(node)){
+                                    pending.remove();
+                                    // Message may have been sitting in the pending list a while
+                                    // waiting for the consumer to ak the message.
+                                    if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){
+                                        continue; // just drop it.
+                                    }
+                                    if(toDispatch==null){
+                                        toDispatch=new ArrayList();
+                                    }
+                                    toDispatch.add(node);
+                                    count++;
+                                }
                             }
-                            if(toDispatch==null){
-                                toDispatch=new ArrayList();
-                            }
-                            toDispatch.add(node);
                         }
                     }finally{
                         pending.release();
                     }
                 }
                 if(toDispatch!=null){
-                    for(int i=0;i<toDispatch.size();i++){
-                        MessageReference node=(MessageReference)toDispatch.get(i);
-                        dispatch(node);
+                    synchronized(dispatched){
+                        for(int i=0;i<toDispatch.size();i++){
+                            MessageReference node=(MessageReference)toDispatch.get(i);
+                            dispatch(node);
+                        }
                     }
                 }
             }finally{
@@ -458,6 +472,7 @@
                 }
                 return true;
             }else{
+                QueueMessageReference n = (QueueMessageReference) node;
                 return false;
             }
         }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=490814&r1=490813&r2=490814
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Dec 28 13:03:53 2006
@@ -28,6 +28,7 @@
 
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
 import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
 import org.apache.activemq.broker.region.group.MessageGroupMap;
@@ -44,6 +45,7 @@
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.kaha.Store;
 import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.store.MessageRecoveryListener;
@@ -74,7 +76,7 @@
     private final Valve dispatchValve = new Valve(true);
     private final UsageManager usageManager;
     private final DestinationStatistics destinationStatistics = new DestinationStatistics();
-    private  PendingMessageCursor messages = new VMPendingMessageCursor();
+    private  PendingMessageCursor messages;
     private final LinkedList pagedInMessages = new LinkedList();
 
     private LockOwner exclusiveOwner;
@@ -92,13 +94,20 @@
     private final Object exclusiveLockMutex = new Object();
     private final Object doDispatchMutex = new Object();
     private TaskRunner taskRunner;
+    private boolean started = false;
 
     public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, DestinationStatistics parentStats,
-            TaskRunnerFactory taskFactory) throws Exception {
+            TaskRunnerFactory taskFactory, Store tmpStore) throws Exception {
         this.destination = destination;
         this.usageManager = new UsageManager(memoryManager);
         this.usageManager.setLimit(Long.MAX_VALUE);
         this.store = store;
+        if(destination.isTemporary()){
+            this.messages=new VMPendingMessageCursor();
+        }else{
+            this.messages=new StoreQueueCursor(this,tmpStore);
+        }
+        
         this.taskRunner = taskFactory.createTaskRunner(this, "Queue  "+destination.getPhysicalName());
 
         // Let the store know what usage manager we are using so that he can
@@ -118,18 +127,16 @@
         if(store!=null){
             // Restore the persistent messages.
             messages.setUsageManager(getUsageManager());
-            messages.start();
             if(messages.isRecoveryRequired()){
                 store.recover(new MessageRecoveryListener(){
 
                     public void recoverMessage(Message message){
-                    	// Message could have expired while it was being loaded..
-                    	if( message.isExpired() ) {
-                    		// TODO: remove message from store.
-                    		return;
-                    	}
-
-                    	message.setRegionDestination(Queue.this);
+                        // Message could have expired while it was being loaded..
+                        if(message.isExpired()){
+                            // TODO remove from store
+                            return;
+                        }
+                        message.setRegionDestination(Queue.this);
                         synchronized(messages){
                             try{
                                 messages.addMessageLast(message);
@@ -157,10 +164,12 @@
 
     /**
      * Lock a node
+     * 
      * @param node
      * @param lockOwner
      * @return true if can be locked
-     * @see org.apache.activemq.broker.region.Destination#lock(org.apache.activemq.broker.region.MessageReference, org.apache.activemq.broker.region.LockOwner)
+     * @see org.apache.activemq.broker.region.Destination#lock(org.apache.activemq.broker.region.MessageReference,
+     *      org.apache.activemq.broker.region.LockOwner)
      */
     public boolean lock(MessageReference node,LockOwner lockOwner){
         synchronized(exclusiveLockMutex){
@@ -309,46 +318,60 @@
     }
 
     public void send(final ConnectionContext context,final Message message) throws Exception{
-    	// There is delay between the client sending it and it arriving at the
-    	// destination.. it may have expired.
-    	if( message.isExpired() ) {
-    		return;
-    	}
-    		
+        // There is delay between the client sending it and it arriving at the
+        // destination.. it may have expired.
+        if(message.isExpired()){
+            if (log.isDebugEnabled()) {
+                log.debug("Expired message: " + message);
+            }
+            return;
+        }
         if(context.isProducerFlowControl()){
             if(usageManager.isSendFailIfNoSpace()&&usageManager.isFull()){
                 throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
             }else{
                 usageManager.waitForSpace();
-                
                 // The usage manager could have delayed us by the time
                 // we unblock the message could have expired..
-            	if( message.isExpired() ) {
-            		return;
-            	}
+                if(message.isExpired()){
+                    if (log.isDebugEnabled()) {
+                        log.debug("Expired message: " + message);
+                    }
+                    return;
+                }
             }
         }
         message.setRegionDestination(this);
-        if (store != null && message.isPersistent()) {
-            store.addMessage(context, message);
+        if(store!=null&&message.isPersistent()){
+            store.addMessage(context,message);
         }
         if(context.isInTransaction()){
             context.getTransaction().addSynchronization(new Synchronization(){
 
                 public void afterCommit() throws Exception{
-                	
-                	// It could take while before we receive the commit
-                	// operration.. by that time the message could have expired..
-                	if( message.isExpired() ) {
-                		// TODO: remove message from store.
-                		return;
-                	}
-
+                    //even though the message could be expired - it won't be from the store
+                    //and it's important to keep the store/cursor in step
+                    synchronized(messages){
+                        messages.addMessageLast(message);
+                    }
+                    // It could take while before we receive the commit
+                    // operration.. by that time the message could have expired..
+                    if(message.isExpired()){
+                        // TODO: remove message from store.
+                        if (log.isDebugEnabled()) {
+                            log.debug("Expired message: " + message);
+                        }
+                        return;
+                    }
                     sendMessage(context,message);
                 }
             });
         }else{
+            synchronized(messages){
+                messages.addMessageLast(message);
+            }
             sendMessage(context,message);
+            
         }
     }
        
@@ -432,12 +455,19 @@
     }
 
     public void start() throws Exception {
+        started = true;
+        messages.start();
+        doPageIn(false);
     }
 
     public void stop() throws Exception {
+        started = false;
         if( taskRunner!=null ) {
             taskRunner.shutdown();
         }
+        if(messages!=null){
+            messages.stop();
+        }
     }
 
     // Properties
@@ -528,6 +558,11 @@
 
     public Message[] browse() {
         ArrayList l = new ArrayList();
+        try{
+            doPageIn(true);
+        }catch(Exception e){
+            log.error("caught an exception browsing " + this,e);
+        }
         synchronized(pagedInMessages) {
             for (Iterator i = pagedInMessages.iterator();i.hasNext();) {
                 MessageReference r = (MessageReference)i.next();
@@ -538,7 +573,7 @@
                         l.add(m);
                     }
                 }catch(IOException e){
-                    log.error("caught an exception brwsing " + this,e);
+                    log.error("caught an exception browsing " + this,e);
                 }
                 finally {
                     r.decrementReferenceCount();
@@ -850,11 +885,10 @@
         return answer;
     }
     
+      
     private void sendMessage(final ConnectionContext context,Message msg) throws Exception{
         
-        synchronized(messages){
-            messages.addMessageLast(msg);
-        }
+        
         destinationStatistics.getEnqueues().increment();
         destinationStatistics.getMessages().increment();
         pageInMessages(false);
@@ -863,10 +897,11 @@
     private List doPageIn() throws Exception{
         return doPageIn(true);
     }
+    
     private List doPageIn(boolean force) throws Exception{
         final int toPageIn=maximumPagedInMessages-pagedInMessages.size();
         List result=null;
-        if((force || !consumers.isEmpty())&&toPageIn>0){
+        if((force||!consumers.isEmpty())&&toPageIn>0){
             try{
                 dispatchValve.increment();
                 int count=0;
@@ -877,9 +912,15 @@
                         while(messages.hasNext()&&count<toPageIn){
                             MessageReference node=messages.next();
                             messages.remove();
-                            node=createMessageReference(node.getMessage());
-                            result.add(node);
-                            count++;
+                            if(!node.isExpired()){
+                                node=createMessageReference(node.getMessage());
+                                result.add(node);
+                                count++;
+                            }else{
+                                if (log.isDebugEnabled()) {
+                                    log.debug("Expired message: " + node);
+                                }
+                            }
                         }
                     }finally{
                         messages.release();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?view=diff&rev=490814&r1=490813&r2=490814
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Thu Dec 28 13:03:53 2006
@@ -73,7 +73,7 @@
     
     protected boolean canDispatch(MessageReference n) throws IOException {
         QueueMessageReference node = (QueueMessageReference) n;
-        if( node.isAcked() )
+        if( node.isAcked())
             return false;
         // Keep message groups together.
         String groupId = node.getGroupID();
@@ -208,7 +208,7 @@
     
     /**
      */
-    synchronized public void destroy() {        
+    public void destroy() {        
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?view=diff&rev=490814&r1=490813&r2=490814
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Thu Dec 28 13:03:53 2006
@@ -78,7 +78,7 @@
     private final Region tempQueueRegion;
     private final Region tempTopicRegion;
     private BrokerService brokerService;
-    private boolean stopped = false;
+    private boolean started = false;
     private boolean keepDurableSubsActive=false;
     
     protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
@@ -178,6 +178,7 @@
     
     public void start() throws Exception {
         ((TopicRegion)topicRegion).setKeepDurableSubsActive(keepDurableSubsActive);
+        started = true;
         queueRegion.start();
         topicRegion.start();
         tempQueueRegion.start();
@@ -185,7 +186,7 @@
     }
 
     public void stop() throws Exception {
-        stopped = true;
+        started = false;
         ServiceStopper ss = new ServiceStopper();
         doStop(ss);
         ss.throwFirstException();
@@ -245,7 +246,6 @@
         if( destinations.contains(destination) ){
             throw new JMSException("Destination already exists: "+destination);
         }
-        
         Destination answer = null;
         switch(destination.getDestinationType()) {
         case ActiveMQDestination.QUEUE_TYPE:
@@ -366,7 +366,8 @@
     }
 
     public void send(ConnectionContext context,  Message message) throws Exception {
-        message.getMessageId().setBrokerSequenceId(sequenceGenerator.getNextSequenceId());
+        long si = sequenceGenerator.getNextSequenceId();
+        message.getMessageId().setBrokerSequenceId(si);
         if (message.getTimestamp() > 0 && (message.getBrokerPath() == null || message.getBrokerPath().length == 0)) { 
             //timestamp not been disabled and has not passed through a network
             message.setTimestamp(System.currentTimeMillis());
@@ -541,7 +542,7 @@
     }
     
     public boolean isStopped(){
-        return stopped;
+        return !started;
     }
     
     public Set getDurableDestinations(){