You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cm...@apache.org on 2009/06/11 04:18:08 UTC

svn commit: r783607 [1/2] - in /activemq/sandbox/activemq-flow: activemq-all/src/test/java/org/apache/activemq/broker/openwire/ activemq-bio/src/main/java/org/apache/activemq/transport/tcp/ activemq-broker/src/main/java/org/apache/activemq/apollo/ acti...

Author: cmacnaug
Date: Thu Jun 11 02:18:07 2009
New Revision: 783607

URL: http://svn.apache.org/viewvc?rev=783607&view=rev
Log:
Cleaning up MultiWireFormatFactory a bit (So that BrokerConnection works without it). 

Also some cleanup in OpenWireProtocolHandler to avoid broker exception on connection close

Removed:
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/DiscriminatableWireFormatFactory.java
Modified:
    activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java
    activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java
    activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java
    activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java
    activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
    activemq/sandbox/activemq-flow/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/wireformat/DiscriminatableOpenWireFormatFactory.java
    activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java
    activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
    activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/test/java/org/apache/activemq/broker/openwire/stomp/StompBrokerTest.java
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java

Modified: activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java Thu Jun 11 02:18:07 2009
@@ -15,6 +15,14 @@
     protected RemoteConsumer createConsumer() {
         return new OpenwireRemoteConsumer();
     }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.broker.BrokerTestBase#getRemoteWireFormat()
+     */
+    @Override
+    protected String getRemoteWireFormat() {
+         return "openwire";
+    }
     
 
 }

Modified: activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java (original)
+++ activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java Thu Jun 11 02:18:07 2009
@@ -6,10 +6,10 @@
 
 import java.io.IOException;
 
-import org.apache.activemq.WindowLimiter;
-import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.apollo.WindowLimiter;
+import org.apache.activemq.apollo.broker.MessageDelivery;
 import org.apache.activemq.broker.RemoteConsumer;
-import org.apache.activemq.broker.Router;
+import org.apache.activemq.apollo.broker.Router;
 import org.apache.activemq.broker.openwire.OpenWireMessageDelivery;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;

Modified: activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java (original)
+++ activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java Thu Jun 11 02:18:07 2009
@@ -9,10 +9,10 @@
 
 import javax.jms.JMSException;
 
-import org.apache.activemq.WindowLimiter;
-import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.apollo.WindowLimiter;
+import org.apache.activemq.apollo.broker.MessageDelivery;
 import org.apache.activemq.broker.RemoteProducer;
-import org.apache.activemq.broker.Router;
+import org.apache.activemq.apollo.broker.Router;
 import org.apache.activemq.broker.openwire.OpenWireMessageDelivery;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;

Modified: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Thu Jun 11 02:18:07 2009
@@ -611,4 +611,9 @@
             }
         });
     }
+
+    public WireFormat getWireformat()
+    {
+        return wireFormat;
+    }
 }

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java Thu Jun 11 02:18:07 2009
@@ -52,6 +52,7 @@
 
     public void start() throws Exception {
         transport.setTransportListener(this);
+        
         if (transport instanceof DispatchableTransport) {
             DispatchableTransport dt = ((DispatchableTransport) transport);
             if (name != null) {
@@ -146,6 +147,10 @@
         }
     }
 
+    public void setStopping() {
+        stopping.set(true);
+    }
+    
     public boolean isStopping() {
         return stopping.get();
     }

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java Thu Jun 11 02:18:07 2009
@@ -22,7 +22,7 @@
 import org.apache.activemq.apollo.broker.protocol.ProtocolHandler;
 import org.apache.activemq.apollo.broker.protocol.ProtocolHandlerFactory;
 import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.wireformat.MultiWireFormatFactory.WireFormatConnected;
+import org.apache.activemq.wireformat.WireFormat;
 
 public class BrokerConnection extends Connection {
     
@@ -60,17 +60,15 @@
             protocolHandler.onCommand(command);
         } else {
             try {
-
-                WireFormatConnected wfconnected = (WireFormatConnected) command;
-                String wfName = wfconnected.getWireFormatFactory().wireformatName();
+                WireFormat wireformat = transport.getWireformat();
                 try {
-                    protocolHandler = ProtocolHandlerFactory.createProtocolHandler(wfName);
+                    protocolHandler = ProtocolHandlerFactory.createProtocolHandler(wireformat.getName());
                 } catch(Exception e) {
-                    throw IOExceptionSupport.create("No protocol handler available for: "+wfName, e);
+                    throw IOExceptionSupport.create("No protocol handler available for: "+wireformat.getName(), e);
                 }
                 
                 protocolHandler.setConnection(this);
-                protocolHandler.setWireFormat(wfconnected.getWireFormat());
+                protocolHandler.setWireFormat(wireformat);
                 protocolHandler.start();
                 
                 setExceptionListener(new ExceptionListener(){

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Thu Jun 11 02:18:07 2009
@@ -40,7 +40,7 @@
 
 public abstract class BrokerTestBase extends TestCase {
 
-    protected static final int PERFORMANCE_SAMPLES = 5;
+    protected static final int PERFORMANCE_SAMPLES = 3;
 
     protected static final int IO_WORK_AMOUNT = 0;
     protected static final int FANIN_COUNT = 10;
@@ -63,7 +63,7 @@
     // Set to use tcp IO
     protected boolean tcp = true;
     // set to force marshalling even in the NON tcp case.
-    protected boolean forceMarshalling = false;
+    protected boolean forceMarshalling = true;
 
     protected String sendBrokerBindURI;
     protected String receiveBrokerBindURI;
@@ -95,24 +95,37 @@
     protected void setUp() throws Exception {
         dispatcher = createDispatcher();
         dispatcher.start();
+        
+        String brokerWireFormat = getRemoteWireFormat();
+        if(getSupportedWireFormats() != null)
+        {
+            brokerWireFormat= "multi&wireFormat.wireFormats=" + getSupportedWireFormats();
+        }
+        
         if (tcp) {
-            sendBrokerBindURI = "tcp://localhost:10000?wireFormat=multi";
-            receiveBrokerBindURI = "tcp://localhost:20000?wireFormat=multi";
-            sendBrokerConnectURI = "tcp://localhost:10000";
-            receiveBrokerConnectURI = "tcp://localhost:20000";
+            sendBrokerBindURI = "tcp://localhost:10000?wireFormat=" + brokerWireFormat;
+            receiveBrokerBindURI = "tcp://localhost:20000?wireFormat=" + brokerWireFormat;
+            sendBrokerConnectURI = "tcp://localhost:10000?wireFormat=" + getRemoteWireFormat();
+            receiveBrokerConnectURI = "tcp://localhost:20000" + getRemoteWireFormat();
         } else {
+            sendBrokerConnectURI = "pipe://SendBroker";
+            receiveBrokerConnectURI = "pipe://ReceiveBroker";
             if (forceMarshalling) {
-                sendBrokerBindURI = "pipe://SendBroker";
-                receiveBrokerBindURI = "pipe://ReceiveBroker";
+                sendBrokerBindURI = sendBrokerConnectURI + "?wireFormat=" + getRemoteWireFormat();
+                receiveBrokerBindURI = receiveBrokerConnectURI + "?wireFormat=" + getRemoteWireFormat();
             } else {
-                sendBrokerBindURI = "pipe://SendBroker";
-                receiveBrokerBindURI = "pipe://ReceiveBroker";
+                sendBrokerBindURI = sendBrokerConnectURI;
+                receiveBrokerBindURI = receiveBrokerConnectURI;
             }
-            sendBrokerConnectURI = sendBrokerBindURI;
-            receiveBrokerConnectURI = receiveBrokerBindURI;
         }
     }
 
+    protected String getSupportedWireFormats() {
+        return null;
+    }
+
+    protected abstract String getRemoteWireFormat();
+
     protected IDispatcher createDispatcher() {
         return PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", MessageBroker.MAX_PRIORITY, asyncThreadPoolSize);
     }

Modified: activemq/sandbox/activemq-flow/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java (original)
+++ activemq/sandbox/activemq-flow/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java Thu Jun 11 02:18:07 2009
@@ -26,7 +26,7 @@
 
 public class SimpleLoadBalancer<D extends IDispatcher> implements ExecutionLoadBalancer<D> {
 
-    private final boolean DEBUG = true;
+    private final boolean DEBUG = false;
 
     //TODO: Added plumbing for periodic rebalancing which we should
     //consider implementing

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Thu Jun 11 02:18:07 2009
@@ -17,7 +17,6 @@
 package org.apache.activemq.broker.openwire;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
 
@@ -98,227 +97,238 @@
     private OpenWireFormat storeWireFormat;
     private Router router;
     private VirtualHost host;
+    private final CommandVisitor visitor;
 
     public OpenwireProtocolHandler() {
         setStoreWireFormat(new OpenWireFormat());
-    }
+        visitor = new CommandVisitor() {
 
-    public void start() throws Exception {
+            // /////////////////////////////////////////////////////////////////
+            // Methods that keep track of the client state
+            // /////////////////////////////////////////////////////////////////
+            public Response processAddConnection(ConnectionInfo info) throws Exception {
+                connection.setName(info.getClientId());
+                return ack(info);
+            }
 
-    }
+            public Response processAddSession(SessionInfo info) throws Exception {
+                return ack(info);
+            }
 
-    public void stop() throws Exception {
-    }
+            public Response processAddProducer(ProducerInfo info) throws Exception {
+                producers.put(info.getProducerId(), new ProducerContext(info));
+                return ack(info);
+            }
 
-    public void onCommand(Object o) {
+            public Response processAddConsumer(ConsumerInfo info) throws Exception {
+                ConsumerContext ctx = new ConsumerContext(info);
+                consumers.put(info.getConsumerId(), ctx);
+                return ack(info);
+            }
 
-        final Command command = (Command) o;
-        boolean responseRequired = command.isResponseRequired();
-        try {
-            command.visit(new CommandVisitor() {
+            public Response processRemoveConnection(ConnectionId info, long arg1) throws Exception {
+                return null;
+            }
 
-                // /////////////////////////////////////////////////////////////////
-                // Methods that keep track of the client state
-                // /////////////////////////////////////////////////////////////////
-                public Response processAddConnection(ConnectionInfo info) throws Exception {
-                    connection.setName(info.getClientId());
-                    return ack(command);
-                }
+            public Response processRemoveSession(SessionId info, long arg1) throws Exception {
+                return null;
+            }
 
-                public Response processAddSession(SessionInfo info) throws Exception {
-                    return ack(command);
-                }
+            public Response processRemoveProducer(ProducerId info) throws Exception {
+                producers.remove(info);
+                return null;
+            }
 
-                public Response processAddProducer(ProducerInfo info) throws Exception {
-                    producers.put(info.getProducerId(), new ProducerContext(info));
-                    return ack(command);
+            public Response processRemoveConsumer(ConsumerId info, long arg1) throws Exception {
+                ConsumerContext ctx = consumers.remove(info);
+                if (ctx == null) {
+                    
                 }
+                return null;
+            }
 
-                public Response processAddConsumer(ConsumerInfo info) throws Exception {
-                    ConsumerContext ctx = new ConsumerContext(info);
-                    consumers.put(info.getConsumerId(), ctx);
-                    return ack(command);
-                }
+            // /////////////////////////////////////////////////////////////////
+            // Message Processing Methods.
+            // /////////////////////////////////////////////////////////////////
+            public Response processMessage(Message info) throws Exception {
+                ProducerId producerId = info.getProducerId();
+                ProducerContext producerContext = producers.get(producerId);
 
-                public Response processRemoveConnection(ConnectionId info, long arg1) throws Exception {
-                    return ack(command);
-                }
+                OpenWireMessageDelivery md = new OpenWireMessageDelivery(info);
+                md.setStoreWireFormat(storeWireFormat);
+                md.setPersistListener(OpenwireProtocolHandler.this);
 
-                public Response processRemoveSession(SessionId info, long arg1) throws Exception {
-                    return ack(command);
+                // Only producers that are not using a window will block,
+                // and if it blocks.
+                // yes we block the connection's read thread. yes other
+                // sessions will not get
+                // serviced while we block here. The producer is depending
+                // on TCP flow
+                // control to slow him down so we have to stop ready from
+                // the socket at this
+                // point.
+                while (!producerContext.controller.offer(md, null)) {
+                    producerContext.controller.waitForFlowUnblock();
                 }
+                return null;
+            }
 
-                public Response processRemoveProducer(ProducerId info) throws Exception {
-                    producers.remove(info);
-                    return ack(command);
-                }
+            public Response processMessageAck(MessageAck info) throws Exception {
+                ConsumerContext ctx = consumers.get(info.getConsumerId());
+                ctx.ack(info);
+                return ack(info);
+            }
 
-                public Response processRemoveConsumer(ConsumerId info, long arg1) throws Exception {
-                    return ack(command);
-                }
+            // Only used when client prefetch is set to zero.
+            public Response processMessagePull(MessagePull info) throws Exception {
+                return ack(info);
+            }
 
-                // /////////////////////////////////////////////////////////////////
-                // Message Processing Methods.
-                // /////////////////////////////////////////////////////////////////
-                public Response processMessage(Message info) throws Exception {
-                    ProducerId producerId = info.getProducerId();
-                    ProducerContext producerContext = producers.get(producerId);
-
-                    OpenWireMessageDelivery md = new OpenWireMessageDelivery(info);
-                    md.setStoreWireFormat(storeWireFormat);
-                    md.setPersistListener(OpenwireProtocolHandler.this);
-
-                    // Only producers that are not using a window will block,
-                    // and if it blocks.
-                    // yes we block the connection's read thread. yes other
-                    // sessions will not get
-                    // serviced while we block here. The producer is depending
-                    // on TCP flow
-                    // control to slow him down so we have to stop ready from
-                    // the socket at this
-                    // point.
-                    while (!producerContext.controller.offer(md, null)) {
-                        producerContext.controller.waitForFlowUnblock();
-                    }
-                    return null;
-                }
+            // /////////////////////////////////////////////////////////////////
+            // Control Methods
+            // /////////////////////////////////////////////////////////////////
+            public Response processWireFormat(WireFormatInfo info) throws Exception {
 
-                public Response processMessageAck(MessageAck info) throws Exception {
-                    ConsumerContext ctx = consumers.get(info.getConsumerId());
-                    ctx.ack(info);
-                    return ack(command);
-                }
+                // Negotiate the openwire encoding options.
+                WireFormatNegotiator wfn = new WireFormatNegotiator(connection.getTransport(), wireFormat, 1);
+                wfn.sendWireFormat();
+                wfn.negociate(info);
 
-                // Only used when client prefetch is set to zero.
-                public Response processMessagePull(MessagePull info) throws Exception {
-                    return ack(command);
-                }
+                // Now that the encoding is negotiated.. let the client know
+                // the details about this
+                // broker.
+                BrokerInfo brokerInfo = new BrokerInfo();
+                brokerInfo.setBrokerId(new BrokerId(connection.getBroker().getName()));
+                brokerInfo.setBrokerName(connection.getBroker().getName());
+                brokerInfo.setBrokerURL(connection.getBroker().getBindUri());
+                connection.write(brokerInfo);
+                return ack(info);
+            }
 
-                // /////////////////////////////////////////////////////////////////
-                // Control Methods
-                // /////////////////////////////////////////////////////////////////
-                public Response processWireFormat(WireFormatInfo info) throws Exception {
-
-                    // Negotiate the openwire encoding options.
-                    WireFormatNegotiator wfn = new WireFormatNegotiator(connection.getTransport(), wireFormat, 1);
-                    wfn.sendWireFormat();
-                    wfn.negociate(info);
-
-                    // Now that the encoding is negotiated.. let the client know
-                    // the details about this
-                    // broker.
-                    BrokerInfo brokerInfo = new BrokerInfo();
-                    brokerInfo.setBrokerId(new BrokerId(connection.getBroker().getName()));
-                    brokerInfo.setBrokerName(connection.getBroker().getName());
-                    brokerInfo.setBrokerURL(connection.getBroker().getBindUri());
-                    connection.write(brokerInfo);
-                    return ack(command);
-                }
+            public Response processShutdown(ShutdownInfo info) throws Exception {
+                connection.setStopping();
+                return ack(info);
+            }
 
-                public Response processShutdown(ShutdownInfo info) throws Exception {
-                    return ack(command);
+            public Response processKeepAlive(KeepAliveInfo info) throws Exception {
+                if (info.isResponseRequired()) {
+                    info.setResponseRequired(false);
+                    connection.write(info);
                 }
+                return null;
+            }
 
-                public Response processKeepAlive(KeepAliveInfo info) throws Exception {
-                    if (info.isResponseRequired()) {
-                        info.setResponseRequired(false);
-                        connection.write(info);
-                    }
-                    return null;
-                }
+            public Response processFlush(FlushCommand info) throws Exception {
+                return ack(info);
+            }
 
-                public Response processFlush(FlushCommand info) throws Exception {
-                    return ack(command);
-                }
+            public Response processConnectionControl(ConnectionControl info) throws Exception {
+                return ack(info);
+            }
 
-                public Response processConnectionControl(ConnectionControl info) throws Exception {
-                    return ack(command);
-                }
+            public Response processConnectionError(ConnectionError info) throws Exception {
+                return ack(info);
+            }
 
-                public Response processConnectionError(ConnectionError info) throws Exception {
-                    return ack(command);
-                }
+            public Response processConsumerControl(ConsumerControl info) throws Exception {
+                return ack(info);
+            }
 
-                public Response processConsumerControl(ConsumerControl info) throws Exception {
-                    return ack(command);
-                }
+            // /////////////////////////////////////////////////////////////////
+            // Methods for server management
+            // /////////////////////////////////////////////////////////////////
+            public Response processAddDestination(DestinationInfo info) throws Exception {
+                throw new UnsupportedOperationException();
+            }
 
-                // /////////////////////////////////////////////////////////////////
-                // Methods for server management
-                // /////////////////////////////////////////////////////////////////
-                public Response processAddDestination(DestinationInfo info) throws Exception {
-                    throw new UnsupportedOperationException();
-                }
+            public Response processRemoveDestination(DestinationInfo info) throws Exception {
+                throw new UnsupportedOperationException();
+            }
 
-                public Response processRemoveDestination(DestinationInfo info) throws Exception {
-                    throw new UnsupportedOperationException();
-                }
+            public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
+                throw new UnsupportedOperationException();
+            }
 
-                public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
-                    throw new UnsupportedOperationException();
-                }
+            public Response processControlCommand(ControlCommand info) throws Exception {
+                throw new UnsupportedOperationException();
+            }
 
-                public Response processControlCommand(ControlCommand info) throws Exception {
-                    throw new UnsupportedOperationException();
-                }
+            // /////////////////////////////////////////////////////////////////
+            // Methods for transaction management
+            // /////////////////////////////////////////////////////////////////
+            public Response processBeginTransaction(TransactionInfo info) throws Exception {
+                throw new UnsupportedOperationException();
+            }
 
-                // /////////////////////////////////////////////////////////////////
-                // Methods for transaction management
-                // /////////////////////////////////////////////////////////////////
-                public Response processBeginTransaction(TransactionInfo info) throws Exception {
-                    throw new UnsupportedOperationException();
-                }
+            public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
+                throw new UnsupportedOperationException();
+            }
 
-                public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
-                    throw new UnsupportedOperationException();
-                }
+            public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
+                throw new UnsupportedOperationException();
+            }
 
-                public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
-                    throw new UnsupportedOperationException();
-                }
+            public Response processEndTransaction(TransactionInfo info) throws Exception {
+                throw new UnsupportedOperationException();
+            }
 
-                public Response processEndTransaction(TransactionInfo info) throws Exception {
-                    throw new UnsupportedOperationException();
-                }
+            public Response processForgetTransaction(TransactionInfo info) throws Exception {
+                throw new UnsupportedOperationException();
+            }
 
-                public Response processForgetTransaction(TransactionInfo info) throws Exception {
-                    throw new UnsupportedOperationException();
-                }
+            public Response processPrepareTransaction(TransactionInfo info) throws Exception {
+                throw new UnsupportedOperationException();
+            }
 
-                public Response processPrepareTransaction(TransactionInfo info) throws Exception {
-                    throw new UnsupportedOperationException();
-                }
+            public Response processRecoverTransactions(TransactionInfo info) throws Exception {
+                throw new UnsupportedOperationException();
+            }
 
-                public Response processRecoverTransactions(TransactionInfo info) throws Exception {
-                    throw new UnsupportedOperationException();
-                }
+            public Response processRollbackTransaction(TransactionInfo info) throws Exception {
+                throw new UnsupportedOperationException();
+            }
 
-                public Response processRollbackTransaction(TransactionInfo info) throws Exception {
-                    throw new UnsupportedOperationException();
-                }
+            // /////////////////////////////////////////////////////////////////
+            // Methods for cluster operations
+            // These commands are sent to the broker when it's acting like a
+            // client to another broker.
+            // /////////////////////////////////////////////////////////////////
+            public Response processBrokerInfo(BrokerInfo info) throws Exception {
+                throw new UnsupportedOperationException();
+            }
 
-                // /////////////////////////////////////////////////////////////////
-                // Methods for cluster operations
-                // These commands are sent to the broker when it's acting like a
-                // client to another broker.
-                // /////////////////////////////////////////////////////////////////
-                public Response processBrokerInfo(BrokerInfo info) throws Exception {
-                    throw new UnsupportedOperationException();
-                }
+            public Response processMessageDispatch(MessageDispatch info) throws Exception {
+                throw new UnsupportedOperationException();
+            }
 
-                public Response processMessageDispatch(MessageDispatch info) throws Exception {
-                    throw new UnsupportedOperationException();
-                }
+            public Response processMessageDispatchNotification(MessageDispatchNotification info) throws Exception {
+                throw new UnsupportedOperationException();
+            }
 
-                public Response processMessageDispatchNotification(MessageDispatchNotification info) throws Exception {
-                    throw new UnsupportedOperationException();
-                }
+            public Response processProducerAck(ProducerAck info) throws Exception {
+                return ack(info);
+            }
+        };
+    }
 
-                public Response processProducerAck(ProducerAck info) throws Exception {
-                    return ack(command);
-                }
+    public void start() throws Exception {
+
+    }
 
-            });
+    public void stop() throws Exception {
+    }
+
+    public void onCommand(Object o) {
+
+        final Command command = (Command) o;
+        boolean responseRequired = command.isResponseRequired();
+        try {
+            Response response = command.visit(visitor);
+            
+            if (responseRequired && response == null) {
+                ack(command);
+            }
+            
         } catch (Exception e) {
             if (responseRequired) {
                 ExceptionResponse response = new ExceptionResponse(e);
@@ -439,7 +449,7 @@
             controller.useOverFlowQueue(false);
             controller.setExecutor(connection.getDispatcher().createPriorityExecutor(connection.getDispatcher().getDispatchPriorities() - 1));
             super.onFlowOpened(controller);
-            
+
             BrokerSubscription sub = host.createSubscription(this);
             sub.connect(this);
         }

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java Thu Jun 11 02:18:07 2009
@@ -44,6 +44,7 @@
 public final class OpenWireFormat implements WireFormat {
 
     public static final int DEFAULT_VERSION = CommandTypes.PROTOCOL_VERSION;
+    public static final String WIREFORMAT_NAME = "openwire"; 
 
     static final byte NULL_TYPE = CommandTypes.NULL;
     private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE / 2;
@@ -113,6 +114,10 @@
     public int getVersion() {
         return version;
     }
+    
+    public String getName() {
+        return WIREFORMAT_NAME;
+    }
 
     public synchronized ByteSequence marshal(Object command) throws IOException {
 

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java Thu Jun 11 02:18:07 2009
@@ -17,6 +17,7 @@
 package org.apache.activemq.openwire;
 
 import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.wireformat.WireFormat;
 import org.apache.activemq.wireformat.WireFormatFactory;
 
@@ -136,4 +137,25 @@
             long maxInactivityDurationInitalDelay) {
         this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
     }
+    
+    /* (non-Javadoc)
+     * @see org.apache.activemq.wireformat.WireFormatFactory#isDiscriminatable()
+     */
+    public boolean isDiscriminatable() {
+        return false;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.wireformat.WireFormatFactory#matchesWireformatHeader(org.apache.activemq.util.ByteSequence)
+     */
+    public boolean matchesWireformatHeader(ByteSequence byteSequence) {
+        throw new UnsupportedOperationException();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.wireformat.WireFormatFactory#maxWireformatHeaderLength()
+     */
+    public int maxWireformatHeaderLength() {
+        throw new UnsupportedOperationException();
+    }
 }

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Thu Jun 11 02:18:07 2009
@@ -48,6 +48,7 @@
 import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.ServiceSupport;
+import org.apache.activemq.wireformat.WireFormat;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -92,13 +93,12 @@
     private Exception connectionFailure;
     private boolean firstConnection = true;
     //optionally always have a backup created
-    private boolean backup=false;
-    private List<BackupTransport> backups=new CopyOnWriteArrayList<BackupTransport>();
-    private int backupPoolSize=1;
+    private boolean backup = false;
+    private List<BackupTransport> backups = new CopyOnWriteArrayList<BackupTransport>();
+    private int backupPoolSize = 1;
     private boolean trackMessages = false;
     private int maxCacheSize = 128 * 1024;
     private TransportListener disposedListener = new DefaultTransportListener() {};
-    
 
     private final TransportListener myTransportListener = createTransportListener();
 
@@ -108,27 +108,27 @@
         // Setup a task that is used to reconnect the a connection async.
         reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
             public boolean iterate() {
-            	boolean result=false;
-            	boolean buildBackup=true;
-            	boolean doReconnect = !disposed;
-            	synchronized(backupMutex) {
-                	if (connectedTransport.get()==null && !disposed) {
-                		result=doReconnect();
-                		buildBackup=false;
-                	}
-            	}
-            	if(buildBackup) {
-            		buildBackups();
-            	}else {
-            		//build backups on the next iteration
-            		result=true;
-            		try {
+                boolean result = false;
+                boolean buildBackup = true;
+                boolean doReconnect = !disposed;
+                synchronized (backupMutex) {
+                    if (connectedTransport.get() == null && !disposed) {
+                        result = doReconnect();
+                        buildBackup = false;
+                    }
+                }
+                if (buildBackup) {
+                    buildBackups();
+                } else {
+                    //build backups on the next iteration
+                    result = true;
+                    try {
                         reconnectTask.wakeup();
                     } catch (InterruptedException e) {
                         LOG.debug("Reconnect task has been interrupted.", e);
                     }
-            	}
-            	return result;
+                }
+                return result;
             }
 
         }, "ActiveMQ Failover Worker: " + System.identityHashCode(this));
@@ -137,22 +137,22 @@
     TransportListener createTransportListener() {
         return new TransportListener() {
             public void onCommand(Object o) {
-                Command command = (Command)o;
+                Command command = (Command) o;
                 if (command == null) {
                     return;
                 }
                 if (command.isResponse()) {
                     Object object = null;
-                    synchronized(requestMap) {
-                     object = requestMap.remove(Integer.valueOf(((Response)command).getCorrelationId()));
+                    synchronized (requestMap) {
+                        object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId()));
                     }
                     if (object != null && object.getClass() == Tracked.class) {
-                        ((Tracked)object).onResponses();
+                        ((Tracked) object).onResponses();
                     }
                 }
                 if (!initialized) {
                     if (command.isBrokerInfo()) {
-                        BrokerInfo info = (BrokerInfo)command;
+                        BrokerInfo info = (BrokerInfo) command;
                         BrokerInfo[] peers = info.getPeerBrokerInfos();
                         if (peers != null) {
                             for (int i = 0; i < peers.length; i++) {
@@ -192,28 +192,27 @@
         };
     }
 
-
     public final void handleTransportFailure(IOException e) throws InterruptedException {
-        
+
         Transport transport = connectedTransport.getAndSet(null);
-        if( transport!=null ) {
-            
+        if (transport != null) {
+
             transport.setTransportListener(disposedListener);
             ServiceSupport.dispose(transport);
-            
+
             synchronized (reconnectMutex) {
                 boolean reconnectOk = false;
-                if(started) {
-                    LOG.warn("Transport failed to " + connectedTransportURI+ " , attempting to automatically reconnect due to: " + e);
+                if (started) {
+                    LOG.warn("Transport failed to " + connectedTransportURI + " , attempting to automatically reconnect due to: " + e);
                     LOG.debug("Transport failed with the following exception:", e);
                     reconnectOk = true;
                 }
-                
+
                 initialized = false;
-                failedConnectTransportURI=connectedTransportURI;
+                failedConnectTransportURI = connectedTransportURI;
                 connectedTransportURI = null;
-                connected=false;
-                if(reconnectOk) {
+                connected = false;
+                if (reconnectOk) {
                     reconnectTask.wakeup();
                 }
             }
@@ -243,7 +242,7 @@
     }
 
     public void stop() throws Exception {
-        Transport transportToStop=null;
+        Transport transportToStop = null;
         synchronized (reconnectMutex) {
             LOG.debug("Stopped.");
             if (!started) {
@@ -252,7 +251,7 @@
             started = false;
             disposed = true;
             connected = false;
-            for (BackupTransport t:backups) {
+            for (BackupTransport t : backups) {
                 t.setDisposed(true);
             }
             backups.clear();
@@ -266,7 +265,7 @@
             sleepMutex.notifyAll();
         }
         reconnectTask.shutdown();
-        if( transportToStop!=null ) {
+        if (transportToStop != null) {
             transportToStop.stop();
         }
     }
@@ -320,14 +319,14 @@
     }
 
     public long getTimeout() {
-		return timeout;
-	}
+        return timeout;
+    }
 
-	public void setTimeout(long timeout) {
-		this.timeout = timeout;
-	}
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
 
-	/**
+    /**
      * @return Returns the randomize.
      */
     public boolean isRandomize() {
@@ -335,29 +334,30 @@
     }
 
     /**
-     * @param randomize The randomize to set.
+     * @param randomize
+     *            The randomize to set.
      */
     public void setRandomize(boolean randomize) {
         this.randomize = randomize;
     }
-    
+
     public boolean isBackup() {
-		return backup;
-	}
+        return backup;
+    }
+
+    public void setBackup(boolean backup) {
+        this.backup = backup;
+    }
+
+    public int getBackupPoolSize() {
+        return backupPoolSize;
+    }
+
+    public void setBackupPoolSize(int backupPoolSize) {
+        this.backupPoolSize = backupPoolSize;
+    }
 
-	public void setBackup(boolean backup) {
-		this.backup = backup;
-	}
-
-	public int getBackupPoolSize() {
-		return backupPoolSize;
-	}
-
-	public void setBackupPoolSize(int backupPoolSize) {
-		this.backupPoolSize = backupPoolSize;
-	}
-	
-	public boolean isTrackMessages() {
+    public boolean isTrackMessages() {
         return trackMessages;
     }
 
@@ -372,30 +372,29 @@
     public void setMaxCacheSize(int maxCacheSize) {
         this.maxCacheSize = maxCacheSize;
     }
-	
+
     /**
-     * @return Returns true if the command is one sent when a connection
-     * is being closed.
+     * @return Returns true if the command is one sent when a connection is
+     *         being closed.
      */
     private boolean isShutdownCommand(Command command) {
-	return (command != null && (command.isShutdownInfo() || command instanceof RemoveInfo));
+        return (command != null && (command.isShutdownInfo() || command instanceof RemoveInfo));
     }
-	 
 
     public void oneway(Object o) throws IOException {
-        
-        Command command = (Command)o;
+
+        Command command = (Command) o;
         Exception error = null;
         try {
 
             synchronized (reconnectMutex) {
-            	
+
                 if (isShutdownCommand(command) && connectedTransport.get() == null) {
-                    if(command.isShutdownInfo()) {
+                    if (command.isShutdownInfo()) {
                         // Skipping send of ShutdownInfo command when not connected.
                         return;
                     }
-                    if(command instanceof RemoveInfo) {
+                    if (command instanceof RemoveInfo) {
                         // Simulate response to RemoveInfo command
                         stateTracker.track(command);
                         Response response = new Response();
@@ -412,15 +411,13 @@
                         Transport transport = connectedTransport.get();
                         long start = System.currentTimeMillis();
                         boolean timedout = false;
-                        while (transport == null && !disposed
-                                && connectionFailure == null
-                                && !Thread.currentThread().isInterrupted()) {
+                        while (transport == null && !disposed && connectionFailure == null && !Thread.currentThread().isInterrupted()) {
                             LOG.trace("Waiting for transport to reconnect.");
                             long end = System.currentTimeMillis();
                             if (timeout > 0 && (end - start > timeout)) {
-                            	timedout = true;
-                            	LOG.info("Failover timed out after " + (end - start) + "ms");
-                            	break;
+                                timedout = true;
+                                LOG.info("Failover timed out after " + (end - start) + "ms");
+                                break;
                             }
                             try {
                                 reconnectMutex.wait(100);
@@ -439,8 +436,8 @@
                             } else if (connectionFailure != null) {
                                 error = connectionFailure;
                             } else if (timedout == true) {
-                            	error = new IOException("Failover timeout of " + timeout + " ms reached.");
-                            }else {
+                                error = new IOException("Failover timeout of " + timeout + " ms reached.");
+                            } else {
                                 error = new IOException("Unexpected failure.");
                             }
                             break;
@@ -451,7 +448,7 @@
                         // then hold it in the requestMap so that we can replay
                         // it later.
                         Tracked tracked = stateTracker.track(command);
-                        synchronized(requestMap) {
+                        synchronized (requestMap) {
                             if (tracked != null && tracked.isWaitingForResponse()) {
                                 requestMap.put(Integer.valueOf(command.getCommandId()), tracked);
                             } else if (tracked == null && command.isResponseRequired()) {
@@ -488,7 +485,7 @@
 
                     } catch (IOException e) {
                         if (LOG.isDebugEnabled()) {
-                            LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);   
+                            LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
                         }
                         handleTransportFailure(e);
                     }
@@ -502,7 +499,7 @@
         if (!disposed) {
             if (error != null) {
                 if (error instanceof IOException) {
-                    throw (IOException)error;
+                    throw (IOException) error;
                 }
                 throw IOExceptionSupport.create(error);
             }
@@ -574,7 +571,7 @@
         if (randomize) {
             // Randomly, reorder the list by random swapping
             for (int i = 0; i < l.size(); i++) {
-                int p = (int) (Math.random()*100 % l.size());
+                int p = (int) (Math.random() * 100 % l.size());
                 URI t = l.get(p);
                 l.set(p, l.get(i));
                 l.set(i, t);
@@ -592,7 +589,7 @@
     }
 
     public void setTransportListener(TransportListener commandListener) {
-        synchronized(listenerMutex) {
+        synchronized (listenerMutex) {
             this.transportListener = commandListener;
             listenerMutex.notifyAll();
         }
@@ -604,7 +601,7 @@
             return target.cast(this);
         }
         Transport transport = connectedTransport.get();
-        if ( transport != null) {
+        if (transport != null) {
             return transport.narrow(target);
         }
         return null;
@@ -619,7 +616,7 @@
         t.oneway(cc);
         stateTracker.restore(t);
         Map tmpMap = null;
-        synchronized(requestMap) {
+        synchronized (requestMap) {
             tmpMap = new LinkedHashMap<Integer, Command>(requestMap);
         }
         for (Iterator<Command> iter2 = tmpMap.values().iterator(); iter2.hasNext();) {
@@ -645,7 +642,7 @@
 
     public String getRemoteAddress() {
         Transport transport = connectedTransport.get();
-        if ( transport != null) {
+        if (transport != null) {
             return transport.getRemoteAddress();
         }
         return null;
@@ -654,13 +651,13 @@
     public boolean isFaultTolerant() {
         return true;
     }
-    
+
     public boolean isUseInactivityMonitor() {
         //this is up to the underlying transport:
         return false;
     }
-    
-   final boolean doReconnect() {
+
+    final boolean doReconnect() {
         Exception failure = null;
         synchronized (reconnectMutex) {
 
@@ -678,32 +675,32 @@
                     if (!useExponentialBackOff) {
                         reconnectDelay = initialReconnectDelay;
                     }
-                    synchronized(backupMutex) {
+                    synchronized (backupMutex) {
                         if (backup && !backups.isEmpty()) {
-                        	BackupTransport bt = backups.remove(0);
+                            BackupTransport bt = backups.remove(0);
                             Transport t = bt.getTransport();
                             URI uri = bt.getUri();
                             t.setTransportListener(myTransportListener);
                             try {
-                                if (started) { 
-                                        restoreTransport(t);  
+                                if (started) {
+                                    restoreTransport(t);
                                 }
                                 reconnectDelay = initialReconnectDelay;
-                                failedConnectTransportURI=null;
+                                failedConnectTransportURI = null;
                                 connectedTransportURI = uri;
                                 connectedTransport.set(t);
                                 reconnectMutex.notifyAll();
                                 connectFailures = 0;
                                 LOG.info("Successfully reconnected to backup " + uri);
                                 return false;
-                            }catch (Exception e) {
-                                LOG.debug("Backup transport failed",e);
-                             }
+                            } catch (Exception e) {
+                                LOG.debug("Backup transport failed", e);
+                            }
                         }
                     }
-                    
+
                     Iterator<URI> iter = connectList.iterator();
-                    while(iter.hasNext() && connectedTransport.get() == null && !disposed) {
+                    while (iter.hasNext() && connectedTransport.get() == null && !disposed) {
                         URI uri = iter.next();
                         Transport t = null;
                         try {
@@ -711,7 +708,7 @@
                             t = TransportFactory.compositeConnect(uri);
                             t.setTransportListener(myTransportListener);
                             t.start();
-                            
+
                             if (started) {
                                 restoreTransport(t);
                             }
@@ -722,36 +719,37 @@
                             connectedTransport.set(t);
                             reconnectMutex.notifyAll();
                             connectFailures = 0;
-                         // Make sure on initial startup, that the transportListener 
-                         // has been initialized for this instance.
-                            synchronized(listenerMutex) {
-                                if (transportListener==null) {
+                            // Make sure on initial startup, that the transportListener 
+                            // has been initialized for this instance.
+                            synchronized (listenerMutex) {
+                                if (transportListener == null) {
                                     try {
                                         //if it isn't set after 2secs - it
                                         //probably never will be
                                         listenerMutex.wait(2000);
-                                    }catch(InterruptedException ex) {}
+                                    } catch (InterruptedException ex) {
+                                    }
                                 }
                             }
                             if (transportListener != null) {
                                 transportListener.transportResumed();
-                            }else {
+                            } else {
                                 LOG.debug("transport resumed by transport listener not set");
                             }
                             if (firstConnection) {
-                                firstConnection=false;
+                                firstConnection = false;
                                 LOG.info("Successfully connected to " + uri);
-                            }else {
+                            } else {
                                 LOG.info("Successfully reconnected to " + uri);
                             }
-                            connected=true;
+                            connected = true;
                             return false;
                         } catch (Exception e) {
                             failure = e;
                             LOG.debug("Connect fail to: " + uri + ", reason: " + e);
-                            if (t!=null) {
+                            if (t != null) {
                                 try {
-                                    t.stop();       
+                                    t.stop();
                                 } catch (Exception ee) {
                                     LOG.debug("Stop of failed transport: " + t + " failed with reason: " + ee);
                                 }
@@ -760,29 +758,29 @@
                     }
                 }
             }
-            
+
             if (maxReconnectAttempts > 0 && ++connectFailures >= maxReconnectAttempts) {
                 LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
                 connectionFailure = failure;
- 	
+
                 // Make sure on initial startup, that the transportListener has been initialized
                 // for this instance.
-                synchronized(listenerMutex) {
-                    if (transportListener==null) {
+                synchronized (listenerMutex) {
+                    if (transportListener == null) {
                         try {
                             listenerMutex.wait(2000);
-                        }catch(InterruptedException ex) {}
+                        } catch (InterruptedException ex) {
+                        }
                     }
                 }
 
-          
-                if(transportListener != null) {
+                if (transportListener != null) {
                     if (connectionFailure instanceof IOException) {
-                    	transportListener.onException((IOException)connectionFailure);
+                        transportListener.onException((IOException) connectionFailure);
                     } else {
-                    	transportListener.onException(IOExceptionSupport.create(connectionFailure));
+                        transportListener.onException(IOExceptionSupport.create(connectionFailure));
                     }
-                }        
+                }
                 reconnectMutex.notifyAll();
                 return false;
             }
@@ -809,53 +807,65 @@
         return !disposed;
     }
 
-   
-   final boolean buildBackups() {
-	   synchronized (backupMutex) {
-		   if (!disposed && backup && backups.size() < backupPoolSize) {
-			   List<URI> connectList = getConnectList();
-			   //removed disposed backups
-			   List<BackupTransport>disposedList = new ArrayList<BackupTransport>();
-			   for (BackupTransport bt:backups) {
-				   if (bt.isDisposed()) {
-					   disposedList.add(bt);
-				   }
-			   }
-			   backups.removeAll(disposedList);
-			   disposedList.clear();
-			   for (Iterator<URI>iter = connectList.iterator();iter.hasNext() && backups.size() < backupPoolSize;) {
-				   URI uri = iter.next();
-				   if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
-					   try {
-						   BackupTransport bt = new BackupTransport(this);
-						   bt.setUri(uri);
-						   if (!backups.contains(bt)) {
-							   Transport t = TransportFactory.compositeConnect(uri);
-		                       t.setTransportListener(bt);
-		                       t.start();
-		                       bt.setTransport(t);
-		                       backups.add(bt);
-						   }
-					   }catch(Exception e) {
-						   LOG.debug("Failed to build backup ",e);
-					   }
-				   }
-			   }
-		   }
-	   }
-	   return false;
-   }
+    final boolean buildBackups() {
+        synchronized (backupMutex) {
+            if (!disposed && backup && backups.size() < backupPoolSize) {
+                List<URI> connectList = getConnectList();
+                //removed disposed backups
+                List<BackupTransport> disposedList = new ArrayList<BackupTransport>();
+                for (BackupTransport bt : backups) {
+                    if (bt.isDisposed()) {
+                        disposedList.add(bt);
+                    }
+                }
+                backups.removeAll(disposedList);
+                disposedList.clear();
+                for (Iterator<URI> iter = connectList.iterator(); iter.hasNext() && backups.size() < backupPoolSize;) {
+                    URI uri = iter.next();
+                    if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
+                        try {
+                            BackupTransport bt = new BackupTransport(this);
+                            bt.setUri(uri);
+                            if (!backups.contains(bt)) {
+                                Transport t = TransportFactory.compositeConnect(uri);
+                                t.setTransportListener(bt);
+                                t.start();
+                                bt.setTransport(t);
+                                backups.add(bt);
+                            }
+                        } catch (Exception e) {
+                            LOG.debug("Failed to build backup ", e);
+                        }
+                    }
+                }
+            }
+        }
+        return false;
+    }
 
     public boolean isDisposed() {
-    	return disposed;
+        return disposed;
     }
-    
-    
+
     public boolean isConnected() {
         return connected;
     }
-    
+
     public void reconnect(URI uri) throws IOException {
-    	add(new URI[] {uri});
+        add(new URI[] { uri });
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.transport.Transport#getWireformat()
+     */
+    public WireFormat getWireformat() {
+        Transport connected = connectedTransport.get();
+        if(connected == null)
+        {
+            return null;
+        }
+        return connected.getWireformat();
     }
 }

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/wireformat/DiscriminatableOpenWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/wireformat/DiscriminatableOpenWireFormatFactory.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/wireformat/DiscriminatableOpenWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/wireformat/DiscriminatableOpenWireFormatFactory.java Thu Jun 11 02:18:07 2009
@@ -19,14 +19,18 @@
 import org.apache.activemq.openwire.OpenWireFormatFactory;
 import org.apache.activemq.util.ByteSequence;
 
-public class DiscriminatableOpenWireFormatFactory extends OpenWireFormatFactory implements DiscriminatableWireFormatFactory {
+public class DiscriminatableOpenWireFormatFactory extends OpenWireFormatFactory {
 
-    private static final byte MAGIC[] = new byte[] {1, 'A', 'c', 't', 'i', 'v', 'e', 'M', 'Q'};
+    private static final byte MAGIC[] = new byte[] { 1, 'A', 'c', 't', 'i', 'v', 'e', 'M', 'Q' };
+
+    public boolean isDiscriminatable() {
+        return true;
+    }
 
     public boolean matchesWireformatHeader(ByteSequence byteSequence) {
-        if( byteSequence.length == 4+MAGIC.length ) {
-            for( int i=0; i < MAGIC.length; i++ ) {
-                if( byteSequence.data[i+4] != MAGIC[i] ) {
+        if (byteSequence.length == 4 + MAGIC.length) {
+            for (int i = 0; i < MAGIC.length; i++) {
+                if (byteSequence.data[i + 4] != MAGIC[i]) {
                     return false;
                 }
             }
@@ -36,11 +40,6 @@
     }
 
     public int maxWireformatHeaderLength() {
-        return 4+MAGIC.length;
+        return 4 + MAGIC.length;
     }
-
-    public String wireformatName() {
-        return "openwire";
-    }
-
 }

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java Thu Jun 11 02:18:07 2009
@@ -23,7 +23,10 @@
 
 public class Proto2WireFormatFactory implements WireFormatFactory {
 
+    
     public class TestWireFormat implements StatefulWireFormat {
+        public static final String WIREFORMAT_NAME = "proto";
+
         private ByteBuffer currentOut;
         private byte outType;
         
@@ -247,6 +250,14 @@
         public int getVersion() {
             return 0;
         }
+        
+        /* (non-Javadoc)
+         * @see org.apache.activemq.wireformat.WireFormat#getName()
+         */
+        public String getName() {
+            return WIREFORMAT_NAME;
+        }
+        
         public void setVersion(int version) {
         }
 
@@ -272,6 +283,26 @@
 
 	public WireFormat createWireFormat() {
 		return new TestWireFormat();
-	}	
+	}
 
+    /* (non-Javadoc)
+     * @see org.apache.activemq.wireformat.WireFormatFactory#isDiscriminatable()
+     */
+    public boolean isDiscriminatable() {
+        return false;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.wireformat.WireFormatFactory#matchesWireformatHeader(org.apache.activemq.util.ByteSequence)
+     */
+    public boolean matchesWireformatHeader(ByteSequence byteSequence) {
+        throw new UnsupportedOperationException();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.wireformat.WireFormatFactory#maxWireformatHeaderLength()
+     */
+    public int maxWireformatHeaderLength() {
+        throw new UnsupportedOperationException();
+    }
 }

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java Thu Jun 11 02:18:07 2009
@@ -23,6 +23,8 @@
 public class ProtoWireFormatFactory implements WireFormatFactory {
 
     public class TestWireFormat implements StatefulWireFormat {
+        public static final String WIREFORMAT_NAME = "proto";
+        
         private ByteBuffer currentOut;
         private byte outType;
         
@@ -240,10 +242,37 @@
         public Transport createTransportFilters(Transport transport, Map options) {
            return transport;
         }
+
+        /* (non-Javadoc)
+         * @see org.apache.activemq.wireformat.WireFormat#getName()
+         */
+        public String getName() {
+            return WIREFORMAT_NAME;
+        }
     }
 
 	public WireFormat createWireFormat() {
 		return new TestWireFormat();
 	}	
+	
+	/* (non-Javadoc)
+     * @see org.apache.activemq.wireformat.WireFormatFactory#isDiscriminatable()
+     */
+    public boolean isDiscriminatable() {
+        return false;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.wireformat.WireFormatFactory#matchesWireformatHeader(org.apache.activemq.util.ByteSequence)
+     */
+    public boolean matchesWireformatHeader(ByteSequence byteSequence) {
+        throw new UnsupportedOperationException();
+    }
 
+    /* (non-Javadoc)
+     * @see org.apache.activemq.wireformat.WireFormatFactory#maxWireformatHeaderLength()
+     */
+    public int maxWireformatHeaderLength() {
+        throw new UnsupportedOperationException();
+    }
 }

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java Thu Jun 11 02:18:07 2009
@@ -1,5 +1,6 @@
 package org.apache.activemq.flow;
 
+import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.wireformat.ObjectStreamWireFormat;
 import org.apache.activemq.wireformat.WireFormat;
 import org.apache.activemq.wireformat.WireFormatFactory;
@@ -10,4 +11,24 @@
 		return new ObjectStreamWireFormat();
 	}	
 
+	/* (non-Javadoc)
+     * @see org.apache.activemq.wireformat.WireFormatFactory#isDiscriminatable()
+     */
+    public boolean isDiscriminatable() {
+        return false;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.wireformat.WireFormatFactory#matchesWireformatHeader(org.apache.activemq.util.ByteSequence)
+     */
+    public boolean matchesWireformatHeader(ByteSequence byteSequence) {
+        throw new UnsupportedOperationException();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.wireformat.WireFormatFactory#maxWireformatHeaderLength()
+     */
+    public int maxWireformatHeaderLength() {
+        throw new UnsupportedOperationException();
+    }
 }

Modified: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java (original)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java Thu Jun 11 02:18:07 2009
@@ -39,7 +39,7 @@
 public class StompWireFormat implements WireFormat {
 
     private static final byte[] NO_DATA = new byte[] {};
-    private static final byte[] END_OF_FRAME = new byte[] {0, '\n'};
+    private static final byte[] END_OF_FRAME = new byte[] { 0, '\n' };
 
     private static final int MAX_COMMAND_LENGTH = 1024;
     private static final int MAX_HEADER_LENGTH = 1024 * 10;
@@ -47,6 +47,7 @@
     private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
 
     private int version = 1;
+    public static final String WIREFORMAT_NAME = "stomp";
 
     public ByteSequence marshal(Object command) throws IOException {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -63,7 +64,7 @@
     }
 
     public void marshal(Object command, DataOutput os) throws IOException {
-        StompFrame stomp = (org.apache.activemq.transport.stomp.StompFrame)command;
+        StompFrame stomp = (org.apache.activemq.transport.stomp.StompFrame) command;
 
         StringBuffer buffer = new StringBuffer();
         buffer.append(stomp.getAction());
@@ -71,7 +72,7 @@
 
         // Output the headers.
         for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();) {
-            Map.Entry entry = (Map.Entry)iter.next();
+            Map.Entry entry = (Map.Entry) iter.next();
             buffer.append(entry.getKey());
             buffer.append(Stomp.Headers.SEPERATOR);
             buffer.append(entry.getValue());
@@ -200,14 +201,18 @@
         return version;
     }
 
+    public String getName() {
+        return WIREFORMAT_NAME;
+    }
+
     public void setVersion(int version) {
         this.version = version;
     }
 
-	public boolean inReceive() {
-		//TODO implement the inactivity monitor
-		return false;
-	}
+    public boolean inReceive() {
+        //TODO implement the inactivity monitor
+        return false;
+    }
 
     public Transport createTransportFilters(Transport transport, Map options) {
         if (transport.isUseInactivityMonitor()) {
@@ -215,7 +220,5 @@
         }
         return transport;
     }
-    
-    
 
 }

Modified: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java Thu Jun 11 02:18:07 2009
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.transport.stomp;
 
+import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.wireformat.WireFormat;
 import org.apache.activemq.wireformat.WireFormatFactory;
 
@@ -26,4 +27,25 @@
     public WireFormat createWireFormat() {
         return new StompWireFormat();
     }
+    
+    /* (non-Javadoc)
+     * @see org.apache.activemq.wireformat.WireFormatFactory#matchesWireformatHeader(org.apache.activemq.util.ByteSequence)
+     */
+    public boolean matchesWireformatHeader(ByteSequence byteSequence) {
+        throw new UnsupportedOperationException();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.wireformat.WireFormatFactory#maxWireformatHeaderLength()
+     */
+    public int maxWireformatHeaderLength() {
+        throw new UnsupportedOperationException();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.wireformat.WireFormatFactory#isDiscriminatable()
+     */
+    public boolean isDiscriminatable() {
+        return false;
+    }
 }

Modified: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java Thu Jun 11 02:18:07 2009
@@ -22,10 +22,14 @@
 import org.apache.activemq.transport.stomp.StompWireFormatFactory;
 import org.apache.activemq.util.ByteSequence;
 
-public class DiscriminatableStompWireFormatFactory extends StompWireFormatFactory implements DiscriminatableWireFormatFactory {
+public class DiscriminatableStompWireFormatFactory extends StompWireFormatFactory {
 
     static byte MAGIC[] = toBytes(Stomp.Commands.CONNECT);
     
+    public boolean isDiscriminatable() {
+        return true;
+    }
+    
     static private byte[] toBytes(String value) {
         try {
             return value.getBytes("UTF-8");
@@ -59,9 +63,4 @@
     public int maxWireformatHeaderLength() {
         return MAGIC.length+10;
     }
-
-    public String wireformatName() {
-        return "stomp";
-    }
-
 }

Modified: activemq/sandbox/activemq-flow/activemq-stomp/src/test/java/org/apache/activemq/broker/openwire/stomp/StompBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/test/java/org/apache/activemq/broker/openwire/stomp/StompBrokerTest.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/test/java/org/apache/activemq/broker/openwire/stomp/StompBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/test/java/org/apache/activemq/broker/openwire/stomp/StompBrokerTest.java Thu Jun 11 02:18:07 2009
@@ -7,17 +7,6 @@
 public class StompBrokerTest extends BrokerTestBase {
 
     @Override
-    protected void setUp() throws Exception {
-        super.setUp();
-        if (tcp) {
-            sendBrokerBindURI = "tcp://localhost:10000?wireFormat=multi&transport.useInactivityMonitor=false";
-            receiveBrokerBindURI = "tcp://localhost:20000?wireFormat=multi&transport.useInactivityMonitor=false";
-            sendBrokerConnectURI = "tcp://localhost:10000?wireFormat=stomp&useInactivityMonitor=false";
-            receiveBrokerConnectURI = "tcp://localhost:20000?wireFormat=stomp&useInactivityMonitor=false";
-        }
-    }
-    
-    @Override
     protected RemoteProducer cerateProducer() {
         return new StompRemoteProducer();
     }
@@ -27,5 +16,11 @@
         return new StompRemoteConsumer();
     }
     
-
+    /* (non-Javadoc)
+     * @see org.apache.activemq.broker.BrokerTestBase#getRemoteWireFormat()
+     */
+    @Override
+    protected String getRemoteWireFormat() {
+         return "stomp";
+    }
 }

Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java Thu Jun 11 02:18:07 2009
@@ -20,6 +20,7 @@
 import java.net.URI;
 
 import org.apache.activemq.Service;
+import org.apache.activemq.wireformat.WireFormat;
 
 /**
  * Represents the client side of a transport allowing messages to be sent
@@ -157,6 +158,11 @@
     boolean isConnected();
     
     /**
+     * @return The wireformat for the connection.
+     */
+    WireFormat getWireformat();
+    
+    /**
      * reconnect to another location
      * @param uri
      * @throws IOException on failure of if not supported

Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java Thu Jun 11 02:18:07 2009
@@ -19,6 +19,8 @@
 import java.io.IOException;
 import java.net.URI;
 
+import org.apache.activemq.wireformat.WireFormat;
+
 /**
  * @version $Revision: 1.5 $
  */
@@ -45,7 +47,8 @@
 
     /**
      * @see org.apache.activemq.Service#start()
-     * @throws IOException if the next channel has not been set.
+     * @throws IOException
+     *             if the next channel has not been set.
      */
     public void start() throws Exception {
         if (next == null) {
@@ -126,19 +129,23 @@
         return next.isFaultTolerant();
     }
 
-	public boolean isDisposed() {
-		return next.isDisposed();
-	}
-	
-	public boolean isConnected() {
+    public boolean isDisposed() {
+        return next.isDisposed();
+    }
+
+    public boolean isConnected() {
         return next.isConnected();
     }
 
-	public void reconnect(URI uri) throws IOException {
-		next.reconnect(uri);
-	}
+    public void reconnect(URI uri) throws IOException {
+        next.reconnect(uri);
+    }
 
     public boolean isUseInactivityMonitor() {
         return next.isUseInactivityMonitor();
     }
+
+    public WireFormat getWireformat() {
+        return next.getWireformat();
+    }
 }

Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java Thu Jun 11 02:18:07 2009
@@ -199,6 +199,13 @@
         public void setDispatchPriority(int priority) {
             readContext.updatePriority(priority);
         }
+
+        /* (non-Javadoc)
+         * @see org.apache.activemq.transport.Transport#getWireformat()
+         */
+        public WireFormat getWireformat() {
+            return wireFormat;
+        }
     }
 
     private class PipeTransportServer implements TransportServer {
@@ -267,7 +274,7 @@
 
             String node = uri.getHost();
             if (servers.containsKey(node)) {
-                throw new IOException("Server allready bound: " + node);
+                throw new IOException("Server already bound: " + node);
             }
             PipeTransportServer server = new PipeTransportServer();
             server.setConnectURI(uri);
@@ -275,6 +282,7 @@
             if (options.containsKey("wireFormat")) {
                 server.setWireFormatFactory(createWireFormatFactory(options));
             }
+                
             servers.put(node, server);
             return server;
         } catch (URISyntaxException e) {