You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/01/25 00:08:38 UTC

svn commit: r372050 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ft/ main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/broker/ft/

Author: rajdavies
Date: Tue Jan 24 15:08:29 2006
New Revision: 372050

URL: http://svn.apache.org/viewcvs?rev=372050&view=rev
Log:
Fixes for Master-Slave functionality

Added:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java
Removed:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/FTBrokerTest.java
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java?rev=372050&r1=372049&r2=372050&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java Tue Jan 24 15:08:29 2006
@@ -23,6 +23,7 @@
 import org.apache.activemq.broker.MutableBrokerFilter;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.ExceptionResponse;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
@@ -30,6 +31,7 @@
 import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
@@ -131,6 +133,17 @@
         super.removeProducer(context, info);
         sendAsyncToSlave(new RemoveInfo(info.getProducerId()));
     }
+    
+    public void addConsumer(ConnectionContext context, ConsumerInfo info) throws Throwable {
+        super.addConsumer(context, info);
+        sendAsyncToSlave(info);
+    }
+
+    
+    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Throwable {
+        super.removeSubscription(context, info);
+        sendAsyncToSlave(info);
+    }
       
     
 
@@ -163,6 +176,7 @@
         super.rollbackTransaction(context, xid);
         TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.ROLLBACK);
         sendAsyncToSlave(info);
+        
     }
 
     /**
@@ -174,7 +188,7 @@
     public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Throwable{
         super.commitTransaction(context, xid,onePhase);
         TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.COMMIT_ONE_PHASE);
-        sendAsyncToSlave(info);
+        sendSyncToSlave(info);
     }
 
     /**
@@ -205,26 +219,40 @@
     }
     
     public void send(ConnectionContext context, Message message) throws Throwable{
+        /**
+         * A message can be dispatched before the super.send() method returns
+         * so - here the order is switched to avoid problems on the slave
+         * with receiving acks for messages not received yey
+         */
+        sendToSlave(message);
         super.send(context,message);
-        sendAsyncToSlave(message);
     }
     
    
     public void acknowledge(ConnectionContext context, MessageAck ack) throws Throwable{
         super.acknowledge(context, ack);
-        sendAsyncToSlave(ack);
+        sendToSlave(ack);
     }
     
 
     protected void sendToSlave(Message message){
-        /*
-        if (message.isPersistent()){
+        
+        if (message.isPersistent() && !message.isInTransaction()){
             sendSyncToSlave(message);
         }else{
             sendAsyncToSlave(message);
         }
-        */
-        sendAsyncToSlave(message);
+        
+        
+    }
+    
+    protected void sendToSlave(MessageAck ack){
+       
+        if (ack.isInTransaction()){
+            sendAsyncToSlave(ack);
+        }else{
+            sendSyncToSlave(ack);
+        }
     }
 
     protected void sendAsyncToSlave(Command command){

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java?rev=372050&r1=372049&r2=372050&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java Tue Jan 24 15:08:29 2006
@@ -233,6 +233,6 @@
     private void shutDown(){
         masterActive.set(false);
         broker.masterFailed();
-        //ServiceSupport.dispose(this);
+        ServiceSupport.dispose(this);
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java?rev=372050&r1=372049&r2=372050&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java Tue Jan 24 15:08:29 2006
@@ -57,6 +57,8 @@
     private Message message;    
     /** The number of times the message has requested being hardened */
     private int referenceCount;
+    /** the size of the message **/
+    private int cachedSize = 0;
     
     /**
      * Only used by the END_OF_BROWSE_MARKER singleton
@@ -69,6 +71,7 @@
         this.groupID = null;
         this.groupSequence = 0;
         this.targetConsumerId=null;
+        this.cachedSize = message != null ? message.getSize() : 0;
     }
 
     public IndirectMessageReference(Destination destination, Message message) {
@@ -81,7 +84,8 @@
         this.targetConsumerId=message.getTargetConsumerId();
         
         this.referenceCount=1;
-        message.incrementReferenceCount();        
+        message.incrementReferenceCount();     
+        this.cachedSize = message != null ? message.getSize() : 0;
     }
     
     synchronized public Message getMessageHardRef() {
@@ -201,5 +205,13 @@
 
     public ConsumerId getTargetConsumerId() {
         return targetConsumerId;
+    }
+
+    public int getSize(){
+       Message msg = message;
+       if (msg != null){
+           return msg.getSize();
+       }
+       return cachedSize;
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java?rev=372050&r1=372049&r2=372050&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java Tue Jan 24 15:08:29 2006
@@ -46,5 +46,6 @@
     public int incrementReferenceCount();
     public int decrementReferenceCount();
     public ConsumerId getTargetConsumerId();
+    public int getSize();
     
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=372050&r1=372049&r2=372050&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Tue Jan 24 15:08:29 2006
@@ -1,18 +1,15 @@
 /**
- *
+ * 
  * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.activemq.broker.region;
 
@@ -29,217 +26,186 @@
 import org.apache.activemq.transaction.Synchronization;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
-
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.LinkedList;
-
 /**
  * A subscription that honors the pre-fetch option of the ConsumerInfo.
  * 
  * @version $Revision: 1.15 $
  */
-abstract public class PrefetchSubscription extends AbstractSubscription {
-    static private final Log log = LogFactory.getLog(PrefetchSubscription.class);
-    
-    final protected LinkedList matched = new LinkedList();
-    final protected LinkedList dispatched = new LinkedList();
-    
+abstract public class PrefetchSubscription extends AbstractSubscription{
+    static private final Log log=LogFactory.getLog(PrefetchSubscription.class);
+    final protected LinkedList matched=new LinkedList();
+    final protected LinkedList dispatched=new LinkedList();
     protected int delivered=0;
-    
     int preLoadLimit=1024*100;
     int preLoadSize=0;
     boolean dispatching=false;
-    
-    public PrefetchSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
-        super(broker,context, info);
+
+    public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info)
+                    throws InvalidSelectorException{
+        super(broker,context,info);
     }
 
-    synchronized public void add(MessageReference node) throws Throwable {
-        if( !isFull()  && !isSlaveBroker()) {
+    synchronized public void add(MessageReference node) throws Throwable{
+        if(!isFull()&&!isSlaveBroker()){
             dispatch(node);
-        } else {
+        }else{
             synchronized(matched){
                 matched.addLast(node);
             }
         }
-        
     }
-    
-    public void processMessageDispatchNotification(MessageDispatchNotification  mdn){
+
+    public void processMessageDispatchNotification(MessageDispatchNotification mdn){
         synchronized(matched){
-            for (Iterator i = matched.iterator(); i.hasNext();){
-                MessageReference node = (MessageReference)i.next();
-                if (node.getMessageId().equals(mdn.getMessageId())){
+            for(Iterator i=matched.iterator();i.hasNext();){
+                MessageReference node=(MessageReference) i.next();
+                if(node.getMessageId().equals(mdn.getMessageId())){
                     i.remove();
-                    try {
-                    MessageDispatch md = createMessageDispatch(node, node.getMessage());
-                    dispatched.addLast(node);
-                    
-                    incrementPreloadSize(node.getMessage().getSize()); 
-                    node.decrementReferenceCount();
+                    try{
+                        MessageDispatch md=createMessageDispatch(node,node.getMessage());
+                        dispatched.addLast(node);
+                        incrementPreloadSize(node.getSize());
+                        node.decrementReferenceCount();
                     }catch(Exception e){
-                        log.error("Problem processing MessageDispatchNotification: " + mdn,e);
+                        log.error("Problem processing MessageDispatchNotification: "+mdn,e);
                     }
                     break;
                 }
             }
         }
     }
-    
-    synchronized public void acknowledge(final ConnectionContext context, final MessageAck ack) throws Throwable {
-        
+
+    synchronized public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Throwable{
         // Handle the standard acknowledgment case.
-        boolean wasFull = isFull();
-        if( ack.isStandardAck() ) {
-                        
+        boolean wasFull=isFull();
+        if(ack.isStandardAck()){
             // Acknowledge all dispatched messages up till the message id of the acknowledgment.
             int index=0;
             boolean inAckRange=false;
-            for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
-                final MessageReference node = (MessageReference)iter.next();
-                MessageId messageId = node.getMessageId();
-                
-                if( ack.getFirstMessageId()==null || ack.getFirstMessageId().equals(messageId)) {
-                    inAckRange = true;
+            for(Iterator iter=dispatched.iterator();iter.hasNext();){
+                final MessageReference node=(MessageReference) iter.next();
+                MessageId messageId=node.getMessageId();
+                if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){
+                    inAckRange=true;
                 }
-                
-                if( inAckRange ) {
-                
+                if(inAckRange){
                     // Don't remove the nodes until we are committed.
-                    if ( !context.isInTransaction() ) {
+                    if(!context.isInTransaction()){
                         iter.remove();
-                    } else {
+                    }else{
                         // setup a Synchronization to remove nodes from the dispatched list.
                         context.getTransaction().addSynchronization(new Synchronization(){
-                            public void afterCommit() throws Throwable {
-                                synchronized(PrefetchSubscription.this) {
-                                    
+                            public void afterCommit() throws Throwable{
+                                synchronized(PrefetchSubscription.this){
                                     // Now that we are committed, we can remove the nodes.
                                     boolean inAckRange=false;
                                     int index=0;
-                                    for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
-                                        final MessageReference node = (MessageReference)iter.next();
-                                        MessageId messageId = node.getMessageId();
-                                        if( ack.getFirstMessageId()==null || ack.getFirstMessageId().equals(messageId)) {
-                                            inAckRange = true;
+                                    for(Iterator iter=dispatched.iterator();iter.hasNext();){
+                                        final MessageReference node=(MessageReference) iter.next();
+                                        MessageId messageId=node.getMessageId();
+                                        if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){
+                                            inAckRange=true;
                                         }
-                                        if( inAckRange ) {
+                                        if(inAckRange){
                                             index++;
                                             iter.remove();
-                                            if( ack.getLastMessageId().equals(messageId)) {
-                                                delivered = Math.max(0, delivered - (index+1));
+                                            if(ack.getLastMessageId().equals(messageId)){
+                                                delivered=Math.max(0,delivered-(index+1));
                                                 return;
                                             }
                                         }
                                     }
-                                    
                                 }
                             }
-                        });                        
+                        });
                     }
-                    
                     index++;
-                    acknowledge(context, ack, node);
-                    if( ack.getLastMessageId().equals(messageId)) {                        
-                        if ( context.isInTransaction() )
-                            delivered = Math.max(delivered,index+1);
-                        else 
-                            delivered = Math.max(0, delivered - (index+1));
-                        
-                        if( wasFull && !isFull() ) {                            
+                    acknowledge(context,ack,node);
+                    if(ack.getLastMessageId().equals(messageId)){
+                        if(context.isInTransaction())
+                            delivered=Math.max(delivered,index+1);
+                        else
+                            delivered=Math.max(0,delivered-(index+1));
+                        if(wasFull&&!isFull()){
                             dispatchMatched();
                         }
                         return;
-                    } else {
-//                        System.out.println("no match: "+ack.getLastMessageId()+","+messageId);
+                    }else{
+                        // System.out.println("no match: "+ack.getLastMessageId()+","+messageId);
                     }
                 }
-                
             }
             log.info("Could not correlate acknowledgment with dispatched message: "+ack);
-            
-        } else if( ack.isDeliveredAck() ) {
-            
+        }else if(ack.isDeliveredAck()){
             // Message was delivered but not acknowledged: update pre-fetch counters.
             // Acknowledge all dispatched messages up till the message id of the acknowledgment.
             int index=0;
-            for (Iterator iter = dispatched.iterator(); iter.hasNext();index++) {
-                final MessageReference node = (MessageReference)iter.next();
-                if( ack.getLastMessageId().equals(node.getMessageId()) ) {
-                    delivered = Math.max(delivered,index+1);
-                    if( wasFull && !isFull() ) {
+            for(Iterator iter=dispatched.iterator();iter.hasNext();index++){
+                final MessageReference node=(MessageReference) iter.next();
+                if(ack.getLastMessageId().equals(node.getMessageId())){
+                    delivered=Math.max(delivered,index+1);
+                    if(wasFull&&!isFull()){
                         dispatchMatched();
                     }
                     return;
                 }
             }
             throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack);
-            
-        } else if( ack.isPoisonAck() ) {
-            
+        }else if(ack.isPoisonAck()){
             // TODO: what if the message is already in a DLQ???
-            
-            // Handle the poison ACK case: we need to send the message to a DLQ  
-            if( ack.isInTransaction() )
+            // Handle the poison ACK case: we need to send the message to a DLQ
+            if(ack.isInTransaction())
                 throw new JMSException("Poison ack cannot be transacted: "+ack);
-            
             // Acknowledge all dispatched messages up till the message id of the acknowledgment.
             int index=0;
             boolean inAckRange=false;
-            for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
-                final MessageReference node = (MessageReference)iter.next();
-                MessageId messageId = node.getMessageId();
-                
-                if( ack.getFirstMessageId()==null || ack.getFirstMessageId().equals(messageId)) {
-                    inAckRange = true;
+            for(Iterator iter=dispatched.iterator();iter.hasNext();){
+                final MessageReference node=(MessageReference) iter.next();
+                MessageId messageId=node.getMessageId();
+                if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){
+                    inAckRange=true;
                 }
-                
-                if( inAckRange ) {
-                
+                if(inAckRange){
                     // Send the message to the DLQ
                     node.incrementReferenceCount();
-                    try {
-                        Message message = node.getMessage();
-                        if( message !=null ) {
-                            
-                            // The original destination and transaction id do not get filled when the message is first sent,
+                    try{
+                        Message message=node.getMessage();
+                        if(message!=null){
+                            // The original destination and transaction id do not get filled when the message is first
+                            // sent,
                             // it is only populated if the message is routed to another destination like the DLQ
-                            if( message.getOriginalDestination()!=null )
+                            if(message.getOriginalDestination()!=null)
                                 message.setOriginalDestination(message.getDestination());
-                            if( message.getOriginalTransactionId()!=null )
+                            if(message.getOriginalTransactionId()!=null)
                                 message.setOriginalTransactionId(message.getTransactionId());
-                            
-                            DeadLetterStrategy deadLetterStrategy = node.getRegionDestination().getDeadLetterStrategy();
-                            ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(message.getDestination());
+                            DeadLetterStrategy deadLetterStrategy=node.getRegionDestination().getDeadLetterStrategy();
+                            ActiveMQDestination deadLetterDestination=deadLetterStrategy.getDeadLetterQueueFor(message
+                                            .getDestination());
                             message.setDestination(deadLetterDestination);
                             message.setTransactionId(null);
                             message.evictMarshlledForm();
-
-                            boolean originalFlowControl = context.isProducerFlowControl();
-                            try {
+                            boolean originalFlowControl=context.isProducerFlowControl();
+                            try{
                                 context.setProducerFlowControl(false);
-                                context.getBroker().send(context, message);
-                            } finally {
+                                context.getBroker().send(context,message);
+                            }finally{
                                 context.setProducerFlowControl(originalFlowControl);
                             }
-
-                        }                        
-                    } finally {
+                        }
+                    }finally{
                         node.decrementReferenceCount();
                     }
-                    
                     iter.remove();
                     index++;
-                    acknowledge(context, ack, node);
-                    if( ack.getLastMessageId().equals(messageId)) {
-                        
-                        delivered = Math.max(0, delivered - (index+1));
-                        
-                        if( wasFull && !isFull() ) {                            
+                    acknowledge(context,ack,node);
+                    if(ack.getLastMessageId().equals(messageId)){
+                        delivered=Math.max(0,delivered-(index+1));
+                        if(wasFull&&!isFull()){
                             dispatchMatched();
                         }
                         return;
@@ -248,128 +214,115 @@
             }
             throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack);
         }
-
         throw new JMSException("Invalid acknowledgment: "+ack);
     }
-    
-    protected boolean isFull() {
-        return dispatched.size()-delivered >= info.getPrefetchSize() || preLoadSize > preLoadLimit;
-    }
-    
-    protected void dispatchMatched() throws IOException {
-        if(!dispatching) {
-            dispatching = true;
-            try {
-                for (Iterator iter = matched.iterator(); iter.hasNext() && !isFull();) {
-                    MessageReference node = (MessageReference) iter.next();
+
+    protected boolean isFull(){
+        return dispatched.size()-delivered>=info.getPrefetchSize()||preLoadSize>preLoadLimit;
+    }
+
+    protected void dispatchMatched() throws IOException{
+        if(!dispatching){
+            dispatching=true;
+            try{
+                for(Iterator iter=matched.iterator();iter.hasNext()&&!isFull();){
+                    MessageReference node=(MessageReference) iter.next();
                     iter.remove();
                     dispatch(node);
                 }
-            } finally {
+            }finally{
                 dispatching=false;
             }
         }
     }
-    
-    
 
-    private void dispatch(final MessageReference node) throws IOException {
+    private void dispatch(final MessageReference node) throws IOException{
         node.incrementReferenceCount();
-        
-        final Message message = node.getMessage();
-        if( message == null ) {
+        final Message message=node.getMessage();
+        if(message==null){
             return;
-        }       
-        
+        }
         // Make sure we can dispatch a message.
-        if( canDispatch(node) && !isSlaveBroker()) {
-
-            MessageDispatch md = createMessageDispatch(node, message);
+        if(canDispatch(node)&&!isSlaveBroker()){
+            MessageDispatch md=createMessageDispatch(node,message);
             dispatched.addLast(node);
-            
-            incrementPreloadSize(node.getMessage().getSize()); 
-            
-            if( info.isDispatchAsync() ) {
+            incrementPreloadSize(node.getMessage().getSize());
+            if(info.isDispatchAsync()){
                 md.setConsumer(new Runnable(){
-                    public void run() {
-                        // Since the message gets queued up in async dispatch, we don't want to 
+                    public void run(){
+                        // Since the message gets queued up in async dispatch, we don't want to
                         // decrease the reference count until it gets put on the wire.
-                        onDispatch(node, message);
+                        onDispatch(node,message);
                     }
                 });
                 context.getConnection().dispatchAsync(md);
-            } else {
+            }else{
                 context.getConnection().dispatchSync(md);
-                onDispatch(node, message);
+                onDispatch(node,message);
             }
             // The onDispatch() does the node.decrementReferenceCount();
-        } else {
+        }else{
             // We were not allowed to dispatch that message (an other consumer grabbed it before we did)
             node.decrementReferenceCount();
         }
-        
     }
 
-    synchronized private void onDispatch(final MessageReference node, final Message message) {
-        
-        boolean wasFull = isFull();
-        decrementPreloadSize(message.getSize());        
+    synchronized private void onDispatch(final MessageReference node,final Message message){
+        boolean wasFull=isFull();
+        decrementPreloadSize(message.getSize());
         node.decrementReferenceCount();
-        
-        if( node.getRegionDestination() !=null ) {
+        if(node.getRegionDestination()!=null){
             node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message);
             context.getConnection().getStatistics().onMessageDequeue(message);
-            
-            if( wasFull && !isFull() ) {                            
-                try {
+            if(wasFull&&!isFull()){
+                try{
                     dispatchMatched();
-                } catch (IOException e) {
+                }catch(IOException e){
                     context.getConnection().serviceException(e);
                 }
             }
         }
-        
     }
-    
-    private int incrementPreloadSize(int size) {
-        preLoadSize += size;
+
+    private int incrementPreloadSize(int size){
+        preLoadSize+=size;
         return preLoadSize;
     }
-    
-    private int decrementPreloadSize(int size) {
-        preLoadSize -= size;
+
+    private int decrementPreloadSize(int size){
+        preLoadSize-=size;
         return preLoadSize;
     }
-    
 
     /**
      * @param node
-     * @param message TODO
+     * @param message
+     *            TODO
      * @return
      */
-    protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
-        MessageDispatch md = new MessageDispatch();
-        md.setConsumerId( info.getConsumerId() );
-        md.setDestination( node.getRegionDestination().getActiveMQDestination() );
+    protected MessageDispatch createMessageDispatch(MessageReference node,Message message){
+        MessageDispatch md=new MessageDispatch();
+        md.setConsumerId(info.getConsumerId());
+        md.setDestination(node.getRegionDestination().getActiveMQDestination());
         md.setMessage(message);
-        md.setRedeliveryCounter( node.getRedeliveryCounter() );
+        md.setRedeliveryCounter(node.getRedeliveryCounter());
         return md;
     }
-    
+
     /**
      * Use when a matched message is about to be dispatched to the client.
      * 
      * @param node
-     * @return false if the message should not be dispatched to the client (another sub may have already dispatched it for example).
+     * @return false if the message should not be dispatched to the client (another sub may have already dispatched it
+     *         for example).
      */
     abstract protected boolean canDispatch(MessageReference node);
-    
+
     /**
      * Used during acknowledgment to remove the message.
-     * @throws IOException 
+     * 
+     * @throws IOException
      */
-    protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException {        
-    }
-
-
+    protected void acknowledge(ConnectionContext context,final MessageAck ack,final MessageReference node)
+                    throws IOException{}
 }

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java?rev=372050&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java Tue Jan 24 15:08:29 2006
@@ -0,0 +1,76 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.ft;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.springframework.core.io.ClassPathResource;
+
+/**
+ *Test failover for Queues
+ *
+ */
+public class QueueMasterSlaveTest extends JmsTopicSendReceiveWithTwoConnectionsTest{
+
+   
+   
+    protected BrokerService master;
+    protected BrokerService slave;
+    protected int inflightMessageCount = 0;
+    protected int failureCount = 50;
+    protected String uriString="failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false";
+
+    protected void setUp() throws Exception{
+        failureCount = super.messageCount/2;
+        super.topic = isTopic();
+        BrokerFactoryBean brokerFactory=new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/ft/master.xml"));
+        brokerFactory.afterPropertiesSet();
+        master=brokerFactory.getBroker();
+        brokerFactory=new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/ft/slave.xml"));
+        brokerFactory.afterPropertiesSet();
+        slave=brokerFactory.getBroker();
+        master.start();
+        slave.start();
+        // wait for thing to connect
+        Thread.sleep(1000);
+        super.setUp();
+
+    }
+
+    protected void tearDown() throws Exception{
+        super.tearDown();
+        slave.stop();
+        master.stop();
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception{
+        return new ActiveMQConnectionFactory(uriString);
+    }
+    
+    protected void messageSent() throws Exception{
+        if (++inflightMessageCount >= failureCount){
+            inflightMessageCount = 0;
+            master.stop();
+        }
+    }
+    
+    protected boolean isTopic(){
+        return false;
+    }
+}