You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/04/13 22:15:42 UTC

svn commit: r393912 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/ft/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/active...

Author: rajdavies
Date: Thu Apr 13 13:15:35 2006
New Revision: 393912

URL: http://svn.apache.org/viewcvs?rev=393912&view=rev
Log:
fine tuning, client control commands etc.

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConnectionControlMarshaller.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConsumerControlMarshaller.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionControlTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerControlTest.java
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/BrokerInfoMarshaller.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConnectionInfoMarshaller.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConsumerInfoMarshaller.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MarshallerFactory.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BrokerInfoTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionInfoTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerInfoTest.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Thu Apr 13 13:15:35 2006
@@ -51,9 +51,11 @@
 import org.apache.activemq.command.ActiveMQTempTopic;
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionControl;
 import org.apache.activemq.command.ConnectionError;
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerControl;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.ControlCommand;
@@ -74,6 +76,7 @@
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.failover.FailoverTransport;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.JMSExceptionSupport;
@@ -163,6 +166,7 @@
     protected ActiveMQConnection(Transport transport, JMSStatsImpl factoryStats)
             throws Exception {
         this.info = new ConnectionInfo(new ConnectionId(connectionIdGenerator.generateId()));
+        this.info.setManageable(true);
         this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
         
         this.transport = transport;
@@ -1206,8 +1210,7 @@
         // broker without having to do an RPC to the broker.
         
         ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1),consumerIdGenerator.getNextSequenceId());
-        advisoryConsumer = new AdvisoryConsumer(this, consumerId);
-        
+        advisoryConsumer = new AdvisoryConsumer(this, consumerId);        
     }
 
     /**
@@ -1407,12 +1410,17 @@
             } else if ( command.isBrokerInfo() ) {
                 this.brokerInfo = (BrokerInfo)command;
                 brokerInfoReceived.countDown();
+                this.optimizeAcknowledge &= !this.brokerInfo.isFaultTolerantConfiguration();
             }
             else if (command instanceof ControlCommand) {
                 onControlCommand((ControlCommand) command);
             }
             else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
                 onAsyncException(((ConnectionError)command).getException());
+            }else if (command instanceof ConnectionControl){
+                onConnectionControl((ConnectionControl) command);
+            }else if (command instanceof ConsumerControl){
+                onConsumerControl((ConsumerControl) command);
             }
         }
         for (Iterator iter = transportListeners.iterator(); iter.hasNext();) {
@@ -1451,6 +1459,10 @@
     }
     
     public void transportInterupted() {
+        for (Iterator i = this.sessions.iterator(); i.hasNext();) {
+            ActiveMQSession s = (ActiveMQSession) i.next();
+            s.clearMessagesInProgress();
+        }
         for (Iterator iter = transportListeners.iterator(); iter.hasNext();) {
             TransportListener listener = (TransportListener) iter.next();
             listener.transportInterupted();
@@ -1462,6 +1474,10 @@
             TransportListener listener = (TransportListener) iter.next();
             listener.transportResumed();
         }
+        for (Iterator i = this.sessions.iterator(); i.hasNext();) {
+            ActiveMQSession s = (ActiveMQSession) i.next();
+            s.deliverAcks();
+        }
     }
 
 
@@ -1713,6 +1729,30 @@
             if (text.equals("shutdown")) {
                 log.info("JVM told to shutdown");
                 System.exit(0);
+            }
+        }
+    }
+    
+    protected void onConnectionControl(ConnectionControl command){
+        if (command.isFaultTolerant()){
+            this.optimizeAcknowledge = false;
+            for(Iterator i=this.sessions.iterator();i.hasNext();){
+                ActiveMQSession s=(ActiveMQSession) i.next();
+                s.setOptimizeAcknowledge(false);
+            }
+        }
+    }
+    
+    protected void onConsumerControl(ConsumerControl command){
+        if(command.isClose()){
+            for(Iterator i=this.sessions.iterator();i.hasNext();){
+                ActiveMQSession s=(ActiveMQSession) i.next();
+                s.close(command.getConsumerId());
+            }
+        }else{
+            for(Iterator i=this.sessions.iterator();i.hasNext();){
+                ActiveMQSession s=(ActiveMQSession) i.next();
+                s.setPrefetchSize(command.getConsumerId(),command.getPrefetch());
             }
         }
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Thu Apr 13 13:15:35 2006
@@ -111,8 +111,8 @@
     private MessageAvailableListener availableListener;
 
     private RedeliveryPolicy redeliveryPolicy;
-    private boolean optimizeAcknowledge;
-
+    private AtomicBoolean optimizeAcknowledge = new AtomicBoolean();
+   
     /**
      * Create a MessageConsumer
      * 
@@ -182,6 +182,9 @@
         }
 
         this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
+        this.optimizeAcknowledge.set(session.connection.isOptimizeAcknowledge()&&session.isAutoAcknowledge()
+                        &&!info.isBrowser());
+        this.info.setOptimizedAcknowledge(this.optimizeAcknowledge.get());
         try {
             this.session.addConsumer(this);
             this.session.syncSendPacket(info);
@@ -189,8 +192,7 @@
             this.session.removeConsumer(this);
             throw e;
         }
-        this.optimizeAcknowledge=session.connection.isOptimizeAcknowledge()&&session.isAutoAcknowledge()
-                        &&!info.isDurable()&&!info.getDestination().isQueue();
+        
         if(session.connection.isStarted())
             start();
     }
@@ -509,15 +511,34 @@
         }
     }
     
-    public void clearMessagesInProgress(){
+    void clearMessagesInProgress(){
         unconsumedMessages.clear();
     }
+    
+    void deliverAcks(){
+        synchronized(optimizeAcknowledge){
+            if(this.optimizeAcknowledge.get()){
+                if(!deliveredMessages.isEmpty()){
+                    MessageDispatch md=(MessageDispatch) deliveredMessages.getFirst();
+                    MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
+                    try{
+                        session.asyncSendPacket(ack);
+                    }catch(JMSException e){
+                        log.error("Failed to delivered acknowledgements",e);
+                    }
+                    deliveredMessages.clear();
+                    ackCounter=0;
+                }
+            }
+        }
+    }
 
     public void dispose() throws JMSException {
         if (!unconsumedMessages.isClosed()) {
             // Do we have any acks we need to send out before closing?
             // Ack any delivered messages now. (session may still
             // commit/rollback the acks).
+            deliverAcks();//only processes optimized acknowledgements
             if ((session.isTransacted() || session.isDupsOkAcknowledge())) {
                 acknowledge();
             }
@@ -539,6 +560,18 @@
     protected void checkMessageListener() throws JMSException {
         session.checkMessageListener();
     }
+    
+    protected void setOptimizeAcknowledge(boolean value){
+        synchronized(optimizeAcknowledge){
+            deliverAcks();
+            optimizeAcknowledge.set(value);
+        }
+    }
+    
+    protected void setPrefetchSize(int prefetch){
+        deliverAcks();
+        this.info.setPrefetchSize(prefetch);
+    }
 
     private void beforeMessageIsConsumed(MessageDispatch md) {
         md.setDeliverySequenceId(session.getNextDeliveryId());
@@ -557,18 +590,20 @@
                 ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
             }else if(session.isAutoAcknowledge()){
                 if(!deliveredMessages.isEmpty()){
-                    if(this.optimizeAcknowledge){
-                        ackCounter++;
-                        if(ackCounter>=(info.getPrefetchSize()*.75)){
-                            MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,ackCounter);
+                    synchronized(optimizeAcknowledge){
+                        if(this.optimizeAcknowledge.get()){
+                            ackCounter++;
+                            if(ackCounter>=(info.getPrefetchSize()*.75)){
+                                MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,ackCounter);
+                                session.asyncSendPacket(ack);
+                                ackCounter=0;
+                                deliveredMessages.clear();
+                            }
+                        }else{
+                            MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
                             session.asyncSendPacket(ack);
-                            ackCounter=0;
                             deliveredMessages.clear();
                         }
-                    }else{
-                        MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
-                        session.asyncSendPacket(ack);
-                        deliveredMessages.clear();
                     }
                 }
             }else if(session.isDupsOkAcknowledge()){
@@ -662,11 +697,12 @@
 
     public void rollback() throws JMSException{
         synchronized(unconsumedMessages.getMutex()){
-            if(optimizeAcknowledge){
-               
-                // remove messages read but not acked at the broker yet through optimizeAcknowledge  
-                for(int i=0;(i<deliveredMessages.size())&&(i<ackCounter);i++){
-                    deliveredMessages.removeLast();
+            synchronized(optimizeAcknowledge){
+                if(optimizeAcknowledge.get()){
+                    // remove messages read but not acked at the broker yet through optimizeAcknowledge
+                    for(int i=0;(i<deliveredMessages.size())&&(i<ackCounter);i++){
+                        deliveredMessages.removeLast();
+                    }
                 }
             }
             if(deliveredMessages.isEmpty())

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Thu Apr 13 13:15:35 2006
@@ -517,6 +517,21 @@
             connection.asyncSendPacket(info.createRemoveCommand());
         }
     }
+    
+    void clearMessagesInProgress(){
+        executor.clearMessagesInProgress();
+        for (Iterator iter = consumers.iterator(); iter.hasNext();) {
+            ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) iter.next();
+            consumer.clearMessagesInProgress();
+        }
+    }
+    
+    void deliverAcks(){
+        for (Iterator iter = consumers.iterator(); iter.hasNext();) {
+            ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) iter.next();
+            consumer.deliverAcks();
+        }
+    }
 
     synchronized public void dispose() throws JMSException {
         if (!closed) {
@@ -1704,6 +1719,38 @@
             ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
             if( consumer.getMessageListener()!=null ) {
                 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
+            }
+        }
+    }
+    
+    protected void setOptimizeAcknowledge(boolean value){
+        for (Iterator iter = consumers.iterator(); iter.hasNext();) {
+            ActiveMQMessageConsumer c = (ActiveMQMessageConsumer) iter.next();
+            c.setOptimizeAcknowledge(value);
+        }
+    }
+    
+    protected void setPrefetchSize(ConsumerId id,int prefetch){
+        for(Iterator iter=consumers.iterator();iter.hasNext();){
+            ActiveMQMessageConsumer c=(ActiveMQMessageConsumer) iter.next();
+            if(c.getConsumerId().equals(id)){
+                c.setPrefetchSize(prefetch);
+                break;
+            }
+        }
+    }
+    
+    protected void close(ConsumerId id){
+        for(Iterator iter=consumers.iterator();iter.hasNext();){
+            ActiveMQMessageConsumer c=(ActiveMQMessageConsumer) iter.next();
+            if(c.getConsumerId().equals(id)){
+                try{
+                    c.close();
+                }catch(JMSException e){
+                    log.warn("Exception closing consumer",e);
+                }
+                log.warn("Closed consumer on Command");
+                break;
             }
         }
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java Thu Apr 13 13:15:35 2006
@@ -234,4 +234,8 @@
     public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception;
 
     
+    /**
+     * @return true if fault tolerant
+     */
+    public boolean isFaultTolerantConfiguration();
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java Thu Apr 13 13:15:35 2006
@@ -205,4 +205,9 @@
         
     }
 
+
+    public boolean isFaultTolerantConfiguration(){
+        return next.isFaultTolerantConfiguration();
+    }
+
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java Thu Apr 13 13:15:35 2006
@@ -90,5 +90,10 @@
      * Returns the statistics for this connection
      */
     public ConnectionStatistics getStatistics();
+    
+    /**
+     * @return true if the Connection will process control commands
+     */
+    public boolean isManageable();
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java Thu Apr 13 13:15:35 2006
@@ -198,6 +198,10 @@
 
     public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{        
     }
+
+    public boolean isFaultTolerantConfiguration(){
+        return false;
+    }
     
    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java Thu Apr 13 13:15:35 2006
@@ -200,6 +200,10 @@
         throw new IllegalStateException(this.message);
         
     }
+
+    public boolean isFaultTolerantConfiguration(){
+        throw new IllegalStateException(this.message);
+    }
     
    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java Thu Apr 13 13:15:35 2006
@@ -215,4 +215,8 @@
         
     }
 
+    public boolean isFaultTolerantConfiguration(){
+       return getNext().isFaultTolerantConfiguration();
+    }
+
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java Thu Apr 13 13:15:35 2006
@@ -18,11 +18,13 @@
 
 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.activemq.broker.Connection;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.InsertableMutableBrokerFilter;
 import org.apache.activemq.broker.MutableBrokerFilter;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionControl;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.ExceptionResponse;
@@ -67,11 +69,26 @@
      */
     public void startProcessing(){
         started.set(true);
+        try{
+            Connection[] connections=getClients();
+            ConnectionControl command=new ConnectionControl();
+            command.setFaultTolerant(true);
+            if(connections!=null){
+                for(int i=0;i<connections.length;i++){
+                    if(connections[i].isActive()&&connections[i].isManageable()){
+                        connections[i].dispatchAsync(command);
+                    }
+                }
+            }
+        }catch(Exception e){
+            log.error("Failed to get Connections",e);
+        }
     }
 
     /**
      * stop the broker
-     * @throws Exception 
+     * 
+     * @throws Exception
      */
     public void stop() throws Exception{
         super.stop();
@@ -301,6 +318,10 @@
         sendToSlave(ack);
         super.acknowledge(context, ack);
        
+    }
+    
+    public boolean isFaultTolerantConfiguration(){
+        return true;
     }
     
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java Thu Apr 13 13:15:35 2006
@@ -131,6 +131,9 @@
         connectionInfo.setUserName(userName);
         connectionInfo.setPassword(password);
         localBroker.oneway(connectionInfo);
+        ConnectionInfo remoteInfo=new ConnectionInfo();
+        connectionInfo.copy(remoteInfo);
+        remoteInfo.setBrokerMasterConnector(true);
         remoteBroker.oneway(connectionInfo);
 
         sessionInfo=new SessionInfo(connectionInfo,1);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Thu Apr 13 13:15:35 2006
@@ -470,6 +470,10 @@
     public Set getDurableDestinations(){
         return adaptor != null ? adaptor.getDestinations() : Collections.EMPTY_SET;
     }
+    
+    public boolean isFaultTolerantConfiguration(){
+        return false;
+    }
 
 
     protected void doStop(ServiceStopper ss) {
@@ -486,6 +490,8 @@
     public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
         this.keepDurableSubsActive = keepDurableSubsActive;
     }
+
+    
 
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java Thu Apr 13 13:15:35 2006
@@ -1,105 +1,132 @@
 /**
- *
+ * 
  * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.activemq.command;
 
 import org.apache.activemq.state.CommandVisitor;
-
-
 /**
- * When a client connects to a broker, the broker send the client a BrokerInfo
- * so that the client knows which broker node he's talking to and also any peers
- * that the node has in his cluster.  This is the broker helping the client out
+ * When a client connects to a broker, the broker send the client a BrokerInfo so that the client knows which broker
+ * node he's talking to and also any peers that the node has in his cluster. This is the broker helping the client out
  * in discovering other nodes in the cluster.
  * 
  * @openwire:marshaller code="2"
  * @version $Revision: 1.7 $
  */
-public class BrokerInfo extends BaseCommand {
-
+public class BrokerInfo extends BaseCommand{
     public static final byte DATA_STRUCTURE_TYPE=CommandTypes.BROKER_INFO;
     BrokerId brokerId;
     String brokerURL;
     boolean slaveBroker;
-    
+    boolean masterBroker;
+    boolean faultTolerantConfiguration;
     BrokerInfo peerBrokerInfos[];
     String brokerName;
-	
-    public boolean isBrokerInfo() {
+
+    public boolean isBrokerInfo(){
         return true;
     }
 
-    public byte getDataStructureType() {
+    public byte getDataStructureType(){
         return DATA_STRUCTURE_TYPE;
     }
-    
+
     /**
      * @openwire:property version=1 cache=true
      */
-    public BrokerId getBrokerId() {
+    public BrokerId getBrokerId(){
         return brokerId;
     }
-    public void setBrokerId(BrokerId brokerId) {
-        this.brokerId = brokerId;
+
+    public void setBrokerId(BrokerId brokerId){
+        this.brokerId=brokerId;
     }
 
     /**
      * @openwire:property version=1
      */
-    public String getBrokerURL() {
+    public String getBrokerURL(){
         return brokerURL;
     }
-    public void setBrokerURL(String brokerURL) {
-        this.brokerURL = brokerURL;
+
+    public void setBrokerURL(String brokerURL){
+        this.brokerURL=brokerURL;
     }
 
     /**
      * @openwire:property version=1 testSize=0
      */
-    public BrokerInfo[] getPeerBrokerInfos() {
+    public BrokerInfo[] getPeerBrokerInfos(){
         return peerBrokerInfos;
     }
-    public void setPeerBrokerInfos(BrokerInfo[] peerBrokerInfos) {
-        this.peerBrokerInfos = peerBrokerInfos;
+
+    public void setPeerBrokerInfos(BrokerInfo[] peerBrokerInfos){
+        this.peerBrokerInfos=peerBrokerInfos;
     }
 
     /**
      * @openwire:property version=1
      */
-    public String getBrokerName() {
+    public String getBrokerName(){
         return brokerName;
     }
-    public void setBrokerName(String brokerName) {
-        this.brokerName = brokerName;
+
+    public void setBrokerName(String brokerName){
+        this.brokerName=brokerName;
     }
-	
-    public Response visit(CommandVisitor visitor) throws Exception {
-        return visitor.processBrokerInfo( this );
+
+    public Response visit(CommandVisitor visitor) throws Exception{
+        return visitor.processBrokerInfo(this);
     }
 
     /**
-     * @openwire:property version=1 cache=true
+     * @openwire:property version=1
      */
     public boolean isSlaveBroker(){
         return slaveBroker;
     }
 
-   
     public void setSlaveBroker(boolean slaveBroker){
         this.slaveBroker=slaveBroker;
     }
 
+    /**
+     * @openwire:property version=1
+     */
+    public boolean isMasterBroker(){
+        return masterBroker;
+    }
+
+    /**
+     * @param masterBroker
+     *            The masterBroker to set.
+     */
+    public void setMasterBroker(boolean masterBroker){
+        this.masterBroker=masterBroker;
+    }
+
+    /**
+     * @openwire:property version=1
+     * @return Returns the faultTolerantConfiguration.
+     */
+    public boolean isFaultTolerantConfiguration(){
+        return faultTolerantConfiguration;
+    }
+
+    /**
+     * @param faultTolerantConfiguration
+     *            The faultTolerantConfiguration to set.
+     */
+    public void setFaultTolerantConfiguration(boolean faultTolerantConfiguration){
+        this.faultTolerantConfiguration=faultTolerantConfiguration;
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java Thu Apr 13 13:15:35 2006
@@ -47,6 +47,8 @@
     byte  CONTROL_COMMAND                   = 14;
     byte  FLUSH_COMMAND                     = 15;
     byte  CONNECTION_ERROR                  = 16;
+    byte CONSUMER_CONTROL                   = 17;
+    byte CONNECTION_CONTROL                 = 18;
     
     ///////////////////////////////////////////////////
     //
@@ -124,6 +126,11 @@
     byte  BOOLEAN_TYPE                      = 78;
     byte  BYTE_ARRAY_TYPE                   = 79;
     
+   
+    
+   
+    
+    
     ///////////////////////////////////////////////////
     //
     // Broker to Broker command objects
@@ -133,6 +140,7 @@
     byte  MESSAGE_DISPATCH_NOTIFICATION     = 90;
     byte  NETWORK_BRIDGE_FILTER             = 91;
     
+    
     ///////////////////////////////////////////////////
     //
     // Data structures contained in the command objects.
@@ -153,6 +161,9 @@
     byte  PRODUCER_ID                       = 123;
     byte  BROKER_ID                         = 124;
     
+    
+    
+   
    
 
 

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java?rev=393912&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java Thu Apr 13 13:15:35 2006
@@ -0,0 +1,119 @@
+/**
+ * 
+ * Copyright 2005-2006 The Apache Software Foundation
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.state.CommandVisitor;
+/**
+ * Used to start and stop transports as well as terminating clients.
+ * 
+ * @openwire:marshaller code="18"
+ * 
+ * @version $Revision: 1.1 $
+ */
+public class ConnectionControl extends BaseCommand{
+    public static final byte DATA_STRUCTURE_TYPE=CommandTypes.CONNECTION_CONTROL;
+    protected boolean suspend;
+    protected boolean resume;
+    protected boolean close;
+    protected boolean exit;
+    protected boolean faultTolerant;
+
+    public byte getDataStructureType(){
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception{
+        return null;
+    }
+
+    /**
+     * @openwire:property version=1
+     * @return Returns the close.
+     */
+    public boolean isClose(){
+        return close;
+    }
+
+    /**
+     * @param close
+     *            The close to set.
+     */
+    public void setClose(boolean close){
+        this.close=close;
+    }
+
+    /**
+     * @openwire:property version=1
+     * @return Returns the exit.
+     */
+    public boolean isExit(){
+        return exit;
+    }
+
+    /**
+     * @param exit
+     *            The exit to set.
+     */
+    public void setExit(boolean exit){
+        this.exit=exit;
+    }
+
+    /**
+     * @openwire:property version=1
+     * @return Returns the faultTolerant.
+     */
+    public boolean isFaultTolerant(){
+        return faultTolerant;
+    }
+
+    /**
+     * @param faultTolerant
+     *            The faultTolerant to set.
+     */
+    public void setFaultTolerant(boolean faultTolerant){
+        this.faultTolerant=faultTolerant;
+    }
+
+    /**
+     * @openwire:property version=1
+     * @return Returns the resume.
+     */
+    public boolean isResume(){
+        return resume;
+    }
+
+    /**
+     * @param resume
+     *            The resume to set.
+     */
+    public void setResume(boolean resume){
+        this.resume=resume;
+    }
+
+    /**
+     * @openwire:property version=1
+     * @return Returns the suspend.
+     */
+    public boolean isSuspend(){
+        return suspend;
+    }
+
+    /**
+     * @param suspend
+     *            The suspend to set.
+     */
+    public void setSuspend(boolean suspend){
+        this.suspend=suspend;
+    }
+}

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java Thu Apr 13 13:15:35 2006
@@ -33,6 +33,8 @@
     protected String userName;
     protected String password;
     protected BrokerId[] brokerPath;
+    protected boolean brokerMasterConnector;
+    protected boolean manageable;
     
     public ConnectionInfo() {        
     }    
@@ -43,6 +45,16 @@
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
     }
+    
+    public void copy(ConnectionInfo copy) {
+        super.copy(copy);
+        copy.clientId = clientId;
+        copy.userName = userName;
+        copy.password = password;
+        copy.brokerPath = brokerPath;
+        copy.brokerMasterConnector = brokerMasterConnector;
+        copy.manageable = manageable;
+    } 
 
     /**
      * @openwire:property version=1 cache=true
@@ -104,6 +116,30 @@
     
     public Response visit(CommandVisitor visitor) throws Exception {
         return visitor.processAddConnection( this );
+    }
+    /**
+     * @openwire:property version=1
+     */
+    public boolean isBrokerMasterConnector(){
+        return brokerMasterConnector;
+    }
+    /**
+     * @param brokerMasterConnector The brokerMasterConnector to set.
+     */
+    public void setBrokerMasterConnector(boolean slaveBroker){
+        this.brokerMasterConnector=slaveBroker;
+    }
+    /**
+     * @openwire:property version=1
+     */
+    public boolean isManageable(){
+        return manageable;
+    }
+    /**
+     * @param manageable The manageable to set.
+     */
+    public void setManageable(boolean manageable){
+        this.manageable=manageable;
     }
 
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java?rev=393912&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java Thu Apr 13 13:15:35 2006
@@ -0,0 +1,115 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ * Used to start and stop transports as well as terminating clients.
+ * 
+ * @openwire:marshaller code="17"
+ * 
+ * @version $Revision: 1.1 $
+ */
+public class ConsumerControl extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONSUMER_CONTROL;
+
+    protected ConsumerId consumerId;
+    protected boolean close;
+    protected int prefetch;
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    
+
+   
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+    return null;
+    }
+
+
+
+
+
+    /**
+     * @openwire:property version=1
+     * @return Returns the close.
+     */
+    public boolean isClose(){
+        return close;
+    }
+
+
+
+
+
+    /**
+     * @param close The close to set.
+     */
+    public void setClose(boolean close){
+        this.close=close;
+    }
+
+
+
+
+
+    /**
+     * @openwire:property version=1
+     * @return Returns the consumerId.
+     */
+    public ConsumerId getConsumerId(){
+        return consumerId;
+    }
+
+
+
+
+
+    /**
+     * @param consumerId The consumerId to set.
+     */
+    public void setConsumerId(ConsumerId consumerId){
+        this.consumerId=consumerId;
+    }
+
+
+
+
+
+    /**
+     * @openwire:property version=1
+     * @return Returns the prefetch.
+     */
+    public int getPrefetch(){
+        return prefetch;
+    }
+
+
+
+
+
+    /**
+     * @param prefetch The prefetch to set.
+     */
+    public void setPrefetch(int prefetch){
+        this.prefetch=prefetch;
+    }
+}

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java Thu Apr 13 13:15:35 2006
@@ -48,6 +48,7 @@
     protected boolean retroactive;
     protected byte priority;
     protected BrokerId[] brokerPath;
+    protected boolean optimizedAcknowledge;
     
     protected BooleanExpression additionalPredicate;
     protected transient boolean networkSubscription; //this subscription originated from a network connection
@@ -304,6 +305,21 @@
      */
     public void setNetworkSubscription(boolean networkSubscription){
         this.networkSubscription=networkSubscription;
+    }
+
+    /**
+     *  @openwire:property version=1
+     * @return Returns the optimizedAcknowledge.
+     */
+    public boolean isOptimizedAcknowledge(){
+        return optimizedAcknowledge;
+    }
+
+    /**
+     * @param optimizedAcknowledge The optimizedAcknowledge to set.
+     */
+    public void setOptimizedAcknowledge(boolean optimizedAcknowledge){
+        this.optimizedAcknowledge=optimizedAcknowledge;
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/BrokerInfoMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/BrokerInfoMarshaller.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/BrokerInfoMarshaller.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/BrokerInfoMarshaller.java Thu Apr 13 13:15:35 2006
@@ -81,6 +81,8 @@
         }
         info.setBrokerName(tightUnmarshalString(dataIn, bs));
         info.setSlaveBroker(bs.readBoolean());
+        info.setMasterBroker(bs.readBoolean());
+        info.setFaultTolerantConfiguration(bs.readBoolean());
 
     }
 
@@ -98,6 +100,8 @@
         rc += tightMarshalObjectArray1(wireFormat, info.getPeerBrokerInfos(), bs);
         rc += tightMarshalString1(info.getBrokerName(), bs);
         bs.writeBoolean(info.isSlaveBroker());
+        bs.writeBoolean(info.isMasterBroker());
+        bs.writeBoolean(info.isFaultTolerantConfiguration());
 
         return rc + 0;
     }
@@ -118,6 +122,8 @@
         tightMarshalObjectArray2(wireFormat, info.getPeerBrokerInfos(), dataOut, bs);
         tightMarshalString2(info.getBrokerName(), dataOut, bs);
         bs.readBoolean();
+        bs.readBoolean();
+        bs.readBoolean();
 
     }
 
@@ -148,6 +154,8 @@
         }
         info.setBrokerName(looseUnmarshalString(dataIn));
         info.setSlaveBroker(dataIn.readBoolean());
+        info.setMasterBroker(dataIn.readBoolean());
+        info.setFaultTolerantConfiguration(dataIn.readBoolean());
 
     }
 
@@ -165,6 +173,8 @@
         looseMarshalObjectArray(wireFormat, info.getPeerBrokerInfos(), dataOut);
         looseMarshalString(info.getBrokerName(), dataOut);
         dataOut.writeBoolean(info.isSlaveBroker());
+        dataOut.writeBoolean(info.isMasterBroker());
+        dataOut.writeBoolean(info.isFaultTolerantConfiguration());
 
     }
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConnectionControlMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConnectionControlMarshaller.java?rev=393912&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConnectionControlMarshaller.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConnectionControlMarshaller.java Thu Apr 13 13:15:35 2006
@@ -0,0 +1 @@
+/**
 *
 * Copyright 2005-2006 The Apache Software Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.activemq.openwire.v1;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.activemq.openwire.*;
import org.apache.activemq.command.*;



/**
 * Marshalling code for Open Wire Format for ConnectionControlMarshaller
 *
 *
 * NOTE!: This file is auto generated - do not modify!
 *        if yo
 u need to make a change, please see the modify the groovy scripts in the
 *        under src/gram/script and then use maven openwire:generate to regenerate 
 *        this file.
 *
 * @version $Revision$
 */
public class ConnectionControlMarshaller extends BaseCommandMarshaller {

    /**
     * Return the type of Data Structure we marshal
     * @return short representation of the type data structure
     */
    public byte getDataStructureType() {
        return ConnectionControl.DATA_STRUCTURE_TYPE;
    }
    
    /**
     * @return a new object instance
     */
    public DataStructure createObject() {
        return new ConnectionControl();
    }

    /**
     * Un-marshal an object instance from the data input stream
     *
     * @param o the object to un-marshal
     * @param dataIn the data input stream to build the object from
     * @throws IOException
     */
    public void tightUnmarshal(OpenWireFormat wireFormat, Object o, DataInputStream dataIn, BooleanStream
  bs) throws IOException {
        super.tightUnmarshal(wireFormat, o, dataIn, bs);

        ConnectionControl info = (ConnectionControl)o;
        info.setClose(bs.readBoolean());
        info.setExit(bs.readBoolean());
        info.setFaultTolerant(bs.readBoolean());
        info.setResume(bs.readBoolean());
        info.setSuspend(bs.readBoolean());

    }


    /**
     * Write the booleans that this object uses to a BooleanStream
     */
    public int tightMarshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs) throws IOException {

        ConnectionControl info = (ConnectionControl)o;

        int rc = super.tightMarshal1(wireFormat, o, bs);
        bs.writeBoolean(info.isClose());
        bs.writeBoolean(info.isExit());
        bs.writeBoolean(info.isFaultTolerant());
        bs.writeBoolean(info.isResume());
        bs.writeBoolean(info.isSuspend());

        return rc + 0;
    }

    /**
     * Write a object instance to data output stream
     *
     * @pa
 ram o the instance to be marshaled
     * @param dataOut the output stream
     * @throws IOException thrown if an error occurs
     */
    public void tightMarshal2(OpenWireFormat wireFormat, Object o, DataOutputStream dataOut, BooleanStream bs) throws IOException {
        super.tightMarshal2(wireFormat, o, dataOut, bs);

        ConnectionControl info = (ConnectionControl)o;
        bs.readBoolean();
        bs.readBoolean();
        bs.readBoolean();
        bs.readBoolean();
        bs.readBoolean();

    }

    /**
     * Un-marshal an object instance from the data input stream
     *
     * @param o the object to un-marshal
     * @param dataIn the data input stream to build the object from
     * @throws IOException
     */
    public void looseUnmarshal(OpenWireFormat wireFormat, Object o, DataInputStream dataIn) throws IOException {
        super.looseUnmarshal(wireFormat, o, dataIn);

        ConnectionControl info = (ConnectionControl)o;
        info.setClose(dat
 aIn.readBoolean());
        info.setExit(dataIn.readBoolean());
        info.setFaultTolerant(dataIn.readBoolean());
        info.setResume(dataIn.readBoolean());
        info.setSuspend(dataIn.readBoolean());

    }


    /**
     * Write the booleans that this object uses to a BooleanStream
     */
    public void looseMarshal(OpenWireFormat wireFormat, Object o, DataOutputStream dataOut) throws IOException {

        ConnectionControl info = (ConnectionControl)o;

        super.looseMarshal(wireFormat, o, dataOut);
        dataOut.writeBoolean(info.isClose());
        dataOut.writeBoolean(info.isExit());
        dataOut.writeBoolean(info.isFaultTolerant());
        dataOut.writeBoolean(info.isResume());
        dataOut.writeBoolean(info.isSuspend());

    }
}
\ No newline at end of file

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConnectionInfoMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConnectionInfoMarshaller.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConnectionInfoMarshaller.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConnectionInfoMarshaller.java Thu Apr 13 13:15:35 2006
@@ -81,6 +81,8 @@
         else {
             info.setBrokerPath(null);
         }
+        info.setBrokerMasterConnector(bs.readBoolean());
+        info.setManageable(bs.readBoolean());
 
     }
 
@@ -98,6 +100,8 @@
         rc += tightMarshalString1(info.getPassword(), bs);
         rc += tightMarshalString1(info.getUserName(), bs);
         rc += tightMarshalObjectArray1(wireFormat, info.getBrokerPath(), bs);
+        bs.writeBoolean(info.isBrokerMasterConnector());
+        bs.writeBoolean(info.isManageable());
 
         return rc + 0;
     }
@@ -118,6 +122,8 @@
         tightMarshalString2(info.getPassword(), dataOut, bs);
         tightMarshalString2(info.getUserName(), dataOut, bs);
         tightMarshalObjectArray2(wireFormat, info.getBrokerPath(), dataOut, bs);
+        bs.readBoolean();
+        bs.readBoolean();
 
     }
 
@@ -148,6 +154,8 @@
         else {
             info.setBrokerPath(null);
         }
+        info.setBrokerMasterConnector(dataIn.readBoolean());
+        info.setManageable(dataIn.readBoolean());
 
     }
 
@@ -165,6 +173,8 @@
         looseMarshalString(info.getPassword(), dataOut);
         looseMarshalString(info.getUserName(), dataOut);
         looseMarshalObjectArray(wireFormat, info.getBrokerPath(), dataOut);
+        dataOut.writeBoolean(info.isBrokerMasterConnector());
+        dataOut.writeBoolean(info.isManageable());
 
     }
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConsumerControlMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConsumerControlMarshaller.java?rev=393912&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConsumerControlMarshaller.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConsumerControlMarshaller.java Thu Apr 13 13:15:35 2006
@@ -0,0 +1 @@
+/**
 *
 * Copyright 2005-2006 The Apache Software Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.activemq.openwire.v1;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.activemq.openwire.*;
import org.apache.activemq.command.*;



/**
 * Marshalling code for Open Wire Format for ConsumerControlMarshaller
 *
 *
 * NOTE!: This file is auto generated - do not modify!
 *        if you 
 need to make a change, please see the modify the groovy scripts in the
 *        under src/gram/script and then use maven openwire:generate to regenerate 
 *        this file.
 *
 * @version $Revision$
 */
public class ConsumerControlMarshaller extends BaseCommandMarshaller {

    /**
     * Return the type of Data Structure we marshal
     * @return short representation of the type data structure
     */
    public byte getDataStructureType() {
        return ConsumerControl.DATA_STRUCTURE_TYPE;
    }
    
    /**
     * @return a new object instance
     */
    public DataStructure createObject() {
        return new ConsumerControl();
    }

    /**
     * Un-marshal an object instance from the data input stream
     *
     * @param o the object to un-marshal
     * @param dataIn the data input stream to build the object from
     * @throws IOException
     */
    public void tightUnmarshal(OpenWireFormat wireFormat, Object o, DataInputStream dataIn, BooleanStream bs) thr
 ows IOException {
        super.tightUnmarshal(wireFormat, o, dataIn, bs);

        ConsumerControl info = (ConsumerControl)o;
        info.setClose(bs.readBoolean());
        info.setConsumerId((org.apache.activemq.command.ConsumerId) tightUnmarsalNestedObject(wireFormat, dataIn, bs));
        info.setPrefetch(dataIn.readInt());

    }


    /**
     * Write the booleans that this object uses to a BooleanStream
     */
    public int tightMarshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs) throws IOException {

        ConsumerControl info = (ConsumerControl)o;

        int rc = super.tightMarshal1(wireFormat, o, bs);
        bs.writeBoolean(info.isClose());
        rc += tightMarshalNestedObject1(wireFormat, (DataStructure)info.getConsumerId(), bs);

        return rc + 4;
    }

    /**
     * Write a object instance to data output stream
     *
     * @param o the instance to be marshaled
     * @param dataOut the output stream
     * @throws IOException thro
 wn if an error occurs
     */
    public void tightMarshal2(OpenWireFormat wireFormat, Object o, DataOutputStream dataOut, BooleanStream bs) throws IOException {
        super.tightMarshal2(wireFormat, o, dataOut, bs);

        ConsumerControl info = (ConsumerControl)o;
        bs.readBoolean();
        tightMarshalNestedObject2(wireFormat, (DataStructure)info.getConsumerId(), dataOut, bs);
        dataOut.writeInt(info.getPrefetch());

    }

    /**
     * Un-marshal an object instance from the data input stream
     *
     * @param o the object to un-marshal
     * @param dataIn the data input stream to build the object from
     * @throws IOException
     */
    public void looseUnmarshal(OpenWireFormat wireFormat, Object o, DataInputStream dataIn) throws IOException {
        super.looseUnmarshal(wireFormat, o, dataIn);

        ConsumerControl info = (ConsumerControl)o;
        info.setClose(dataIn.readBoolean());
        info.setConsumerId((org.apache.activemq.command
 .ConsumerId) looseUnmarsalNestedObject(wireFormat, dataIn));
        info.setPrefetch(dataIn.readInt());

    }


    /**
     * Write the booleans that this object uses to a BooleanStream
     */
    public void looseMarshal(OpenWireFormat wireFormat, Object o, DataOutputStream dataOut) throws IOException {

        ConsumerControl info = (ConsumerControl)o;

        super.looseMarshal(wireFormat, o, dataOut);
        dataOut.writeBoolean(info.isClose());
        looseMarshalNestedObject(wireFormat, (DataStructure)info.getConsumerId(), dataOut);
        dataOut.writeInt(info.getPrefetch());

    }
}
\ No newline at end of file

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConsumerInfoMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConsumerInfoMarshaller.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConsumerInfoMarshaller.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConsumerInfoMarshaller.java Thu Apr 13 13:15:35 2006
@@ -91,6 +91,7 @@
         }
         info.setAdditionalPredicate((org.apache.activemq.filter.BooleanExpression) tightUnmarsalNestedObject(wireFormat, dataIn, bs));
         info.setNetworkSubscription(bs.readBoolean());
+        info.setOptimizedAcknowledge(bs.readBoolean());
 
     }
 
@@ -115,6 +116,7 @@
         rc += tightMarshalObjectArray1(wireFormat, info.getBrokerPath(), bs);
         rc += tightMarshalNestedObject1(wireFormat, (DataStructure)info.getAdditionalPredicate(), bs);
         bs.writeBoolean(info.isNetworkSubscription());
+        bs.writeBoolean(info.isOptimizedAcknowledge());
 
         return rc + 9;
     }
@@ -145,6 +147,7 @@
         tightMarshalObjectArray2(wireFormat, info.getBrokerPath(), dataOut, bs);
         tightMarshalNestedObject2(wireFormat, (DataStructure)info.getAdditionalPredicate(), dataOut, bs);
         bs.readBoolean();
+        bs.readBoolean();
 
     }
 
@@ -185,6 +188,7 @@
         }
         info.setAdditionalPredicate((org.apache.activemq.filter.BooleanExpression) looseUnmarsalNestedObject(wireFormat, dataIn));
         info.setNetworkSubscription(dataIn.readBoolean());
+        info.setOptimizedAcknowledge(dataIn.readBoolean());
 
     }
 
@@ -212,6 +216,7 @@
         looseMarshalObjectArray(wireFormat, info.getBrokerPath(), dataOut);
         looseMarshalNestedObject(wireFormat, (DataStructure)info.getAdditionalPredicate(), dataOut);
         dataOut.writeBoolean(info.isNetworkSubscription());
+        dataOut.writeBoolean(info.isOptimizedAcknowledge());
 
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MarshallerFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MarshallerFactory.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MarshallerFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MarshallerFactory.java Thu Apr 13 13:15:35 2006
@@ -81,8 +81,10 @@
         add(new DestinationInfoMarshaller());
         add(new ShutdownInfoMarshaller());
         add(new DataResponseMarshaller());
+        add(new ConnectionControlMarshaller());
         add(new KeepAliveInfoMarshaller());
         add(new FlushCommandMarshaller());
+        add(new ConsumerControlMarshaller());
         add(new JournalTopicAckMarshaller());
         add(new BrokerIdMarshaller());
         add(new MessageDispatchMarshaller());

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java Thu Apr 13 13:15:35 2006
@@ -1,60 +1,53 @@
 /**
- *
+ * 
  * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.activemq.transport;
 
 import java.io.IOException;
-
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.Response;
-
-
 /**
  * @version $Revision: 1.5 $
  */
-public class TransportFilter extends DefaultTransportListener implements Transport {
-
+public class TransportFilter implements TransportListener,Transport{
     final protected Transport next;
     private TransportListener transportListener;
 
-    public TransportFilter(Transport next) {
-        this.next = next;
+    public TransportFilter(Transport next){
+        this.next=next;
     }
 
-    public TransportListener getTransportListener() {
+    public TransportListener getTransportListener(){
         return transportListener;
     }
-    
-    public void setTransportListener(TransportListener channelListener) {
-        this.transportListener = channelListener;
-        if (channelListener == null)
+
+    public void setTransportListener(TransportListener channelListener){
+        this.transportListener=channelListener;
+        if(channelListener==null)
             next.setTransportListener(null);
         else
             next.setTransportListener(this);
     }
 
-
     /**
      * @see org.apache.activemq.Service#start()
-     * @throws IOException if the next channel has not been set.
+     * @throws IOException
+     *             if the next channel has not been set.
      */
-    public void start() throws Exception {
-        if( next == null )
+    public void start() throws Exception{
+        if(next==null)
             throw new IOException("The next channel has not been set.");
-        if( transportListener == null )
+        if(transportListener==null)
             throw new IOException("The command listener has not been set.");
         next.start();
     }
@@ -62,51 +55,57 @@
     /**
      * @see org.apache.activemq.Service#stop()
      */
-    public void stop() throws Exception {
+    public void stop() throws Exception{
         next.stop();
-    }    
+    }
 
-    public void onCommand(Command command) {
+    public void onCommand(Command command){
         transportListener.onCommand(command);
     }
 
     /**
      * @return Returns the next.
      */
-    public Transport getNext() {
+    public Transport getNext(){
         return next;
     }
 
-
-    public String toString() {
+    public String toString(){
         return next.toString();
     }
 
-    public void oneway(Command command) throws IOException {
+    public void oneway(Command command) throws IOException{
         next.oneway(command);
     }
 
-    public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
-        return next.asyncRequest(command, null);
+    public FutureResponse asyncRequest(Command command,ResponseCallback responseCallback) throws IOException{
+        return next.asyncRequest(command,null);
     }
 
-    public Response request(Command command) throws IOException {
+    public Response request(Command command) throws IOException{
         return next.request(command);
     }
-    
-    public Response request(Command command,int timeout) throws IOException {
+
+    public Response request(Command command,int timeout) throws IOException{
         return next.request(command,timeout);
     }
 
-    public void onException(IOException error) {
+    public void onException(IOException error){
         transportListener.onException(error);
     }
 
-    public Object narrow(Class target) {
-        if( target.isAssignableFrom(getClass()) ) {
+    public void transportInterupted(){
+        transportListener.transportInterupted();
+    }
+
+    public void transportResumed(){
+        transportListener.transportResumed();
+    }
+
+    public Object narrow(Class target){
+        if(target.isAssignableFrom(getClass())){
             return this;
         }
         return next.narrow(target);
-    }  
-    
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Thu Apr 13 13:15:35 2006
@@ -78,7 +78,7 @@
     private long reconnectDelay = initialReconnectDelay;
     private Exception connectionFailure;
 
-    private final TransportListener myTransportListener = new DefaultTransportListener() {
+    private final TransportListener myTransportListener = new TransportListener() {
         public void onCommand(Command command) {
             if (command == null) {
                 return;
@@ -113,6 +113,18 @@
                 transportListener.onException(new InterruptedIOException());
             }
         }
+        
+        public void transportInterupted(){
+            if (transportListener != null){
+                transportListener.transportInterupted();
+            }
+        }
+
+        public void transportResumed(){
+            if(transportListener != null){
+                transportListener.transportResumed();
+            }
+        }
     };
 
     public FailoverTransport() throws InterruptedIOException {
@@ -147,9 +159,11 @@
                                     Transport t = TransportFactory.compositeConnect(uri);
                                     t.setTransportListener(myTransportListener);
                                     t.start();
+                                    
                                     if (started) {
                                         restoreTransport(t);
                                     }
+                                    
                                     log.debug("Connection established");
                                     reconnectDelay = initialReconnectDelay;
                                     connectedTransportURI = uri;
@@ -159,6 +173,7 @@
                                     if (transportListener != null){
                                         transportListener.transportResumed();
                                     }
+                                   
                                     return false;
                                 }
                                 catch (Exception e) {

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BrokerInfoTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BrokerInfoTest.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BrokerInfoTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BrokerInfoTest.java Thu Apr 13 13:15:35 2006
@@ -62,6 +62,8 @@
             }
         info.setBrokerName("BrokerName:4");
         info.setSlaveBroker(true);
+        info.setMasterBroker(false);
+        info.setFaultTolerantConfiguration(true);
 
             }
         }

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionControlTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionControlTest.java?rev=393912&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionControlTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionControlTest.java Thu Apr 13 13:15:35 2006
@@ -0,0 +1 @@
+/**
 *
 * Copyright 2005-2006 The Apache Software Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.openwire.v1;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.activemq.openwire.*;
import org.apache.activemq.command.*;


/**
 * Test case for the OpenWire marshalling for ConnectionControl
 *
 *
 * NOTE!: This file is auto generated - do not modify!
 *        if you need to m
 ake a change, please see the modify the groovy scripts in the
 *        under src/gram/script and then use maven openwire:generate to regenerate 
 *        this file.
 *
 * @version $Revision: $
 */
public class ConnectionControlTest extends BaseCommandTestSupport {


    public static ConnectionControlTest SINGLETON = new ConnectionControlTest();

    public Object createObject() throws Exception {
    		ConnectionControl info = new ConnectionControl();
    		populateObject(info);
    		return info;
    }

    
    protected void populateObject(Object object) throws Exception {
    		super.populateObject(object);
    		ConnectionControl info = (ConnectionControl) object;
        info.setClose(true);
        info.setExit(false);
        info.setFaultTolerant(true);
        info.setResume(false);
        info.setSuspend(true);

            }
        }
\ No newline at end of file

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionInfoTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionInfoTest.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionInfoTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionInfoTest.java Thu Apr 13 13:15:35 2006
@@ -62,6 +62,8 @@
 	            }
 	            info.setBrokerPath(value);
             }
+        info.setBrokerMasterConnector(true);
+        info.setManageable(false);
 
             }
         }

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerControlTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerControlTest.java?rev=393912&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerControlTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerControlTest.java Thu Apr 13 13:15:35 2006
@@ -0,0 +1 @@
+/**
 *
 * Copyright 2005-2006 The Apache Software Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.openwire.v1;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.activemq.openwire.*;
import org.apache.activemq.command.*;


/**
 * Test case for the OpenWire marshalling for ConsumerControl
 *
 *
 * NOTE!: This file is auto generated - do not modify!
 *        if you need to mak
 e a change, please see the modify the groovy scripts in the
 *        under src/gram/script and then use maven openwire:generate to regenerate 
 *        this file.
 *
 * @version $Revision: $
 */
public class ConsumerControlTest extends BaseCommandTestSupport {


    public static ConsumerControlTest SINGLETON = new ConsumerControlTest();

    public Object createObject() throws Exception {
    		ConsumerControl info = new ConsumerControl();
    		populateObject(info);
    		return info;
    }

    
    protected void populateObject(Object object) throws Exception {
    		super.populateObject(object);
    		ConsumerControl info = (ConsumerControl) object;
        info.setClose(true);
        info.setConsumerId(createConsumerId("ConsumerId:1"));
        info.setPrefetch(1);

            }
        }
\ No newline at end of file

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerInfoTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerInfoTest.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerInfoTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerInfoTest.java Thu Apr 13 13:15:35 2006
@@ -72,6 +72,7 @@
             }
         info.setAdditionalPredicate(createBooleanExpression("AdditionalPredicate:6"));
         info.setNetworkSubscription(false);
+        info.setOptimizedAcknowledge(true);
 
             }
         }