You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/13 17:18:00 UTC

svn commit: r753311 - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/openwire/ main/java/org/apache/activemq/wireformat/ test/java/org/apache/activemq/br...

Author: chirino
Date: Fri Mar 13 16:17:58 2009
New Revision: 753311

URL: http://svn.apache.org/viewvc?rev=753311&view=rev
Log:
- Added the abiliity to discriminate the protocol used by a connection.
- The BrokerConnection now uses a ProtocolHandler once the protocol is discriminated.


Added:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/ConcatInputStream.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableOpenWireFormatFactory.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableWireFormatFactory.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
    activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/multi
Removed:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java?rev=753311&r1=753310&r2=753311&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java Fri Mar 13 16:17:58 2009
@@ -22,11 +22,7 @@
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.dispatch.IDispatcher;
-import org.apache.activemq.flow.Flow;
-import org.apache.activemq.flow.IFlowLimiter;
-import org.apache.activemq.flow.SizeLimiter;
 import org.apache.activemq.transport.DispatchableTransport;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportListener;
@@ -77,7 +73,7 @@
     protected void initialize() {
     }
     
-    protected final void write(final Object o) {
+    public final void write(final Object o) {
         if (blockingWriter==null) {
             try {
                 transport.oneway(o);
@@ -158,57 +154,8 @@
         return inputResumeThreshold;
     }
 
-    protected interface ProtocolLimiter<E> extends IFlowLimiter<E> {
-        public void onProtocolCredit(int credit);
-    }
-
-    protected class WindowLimiter<E> extends SizeLimiter<E> implements ProtocolLimiter<E> {
-        final Flow flow;
-        final boolean clientMode;
-        private int available;
-
-        public WindowLimiter(boolean clientMode, Flow flow, int capacity, int resumeThreshold) {
-            super(capacity, resumeThreshold);
-            this.clientMode = clientMode;
-            this.flow = flow;
-        }
-
-        public void reserve(E elem) {
-            super.reserve(elem);
-//            if (!clientMode) {
-//                 System.out.println(name + " Reserved " + this);
-//            }
-        }
-
-        public void releaseReserved(E elem) {
-            super.reserve(elem);
-//            if (!clientMode) {
-//                System.out.println(name + " Released Reserved " + this);
-//            }
-        }
-
-        protected void remove(int size) {
-            super.remove(size);
-            if (!clientMode) {
-                available += size;
-                if (available >= capacity - resumeThreshold) {
-                    sendCredit(available);
-                    available = 0;
-                }
-            }
-        }
-
-        protected void sendCredit(int credit) {
-            throw new UnsupportedOperationException("Please override this method to provide and implemenation.");
-        }
-
-        public void onProtocolCredit(int credit) {
-            remove(credit);
-        }
-
-        public int getElementSize(MessageDelivery m) {
-            return m.getFlowLimiterSize();
-        }
+    public Transport getTransport() {
+        return transport;
     }
 
 }

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java?rev=753311&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java Fri Mar 13 16:17:58 2009
@@ -0,0 +1,57 @@
+/**
+ * 
+ */
+package org.apache.activemq;
+
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.SizeLimiter;
+
+public class WindowLimiter<E> extends SizeLimiter<E>  {
+        final Flow flow;
+        final boolean clientMode;
+        private int available;
+
+        public WindowLimiter(boolean clientMode, Flow flow, int capacity, int resumeThreshold) {
+            super(capacity, resumeThreshold);
+            this.clientMode = clientMode;
+            this.flow = flow;
+        }
+
+        public void reserve(E elem) {
+            super.reserve(elem);
+//            if (!clientMode) {
+//                 System.out.println(name + " Reserved " + this);
+//            }
+        }
+
+        public void releaseReserved(E elem) {
+            super.reserve(elem);
+//            if (!clientMode) {
+//                System.out.println(name + " Released Reserved " + this);
+//            }
+        }
+
+        protected void remove(int size) {
+            super.remove(size);
+            if (!clientMode) {
+                available += size;
+                if (available >= capacity - resumeThreshold) {
+                    sendCredit(available);
+                    available = 0;
+                }
+            }
+        }
+
+        protected void sendCredit(int credit) {
+            throw new UnsupportedOperationException("Please override this method to provide and implemenation.");
+        }
+
+        public void onProtocolCredit(int credit) {
+            remove(credit);
+        }
+
+        public int getElementSize(MessageDelivery m) {
+            return m.getFlowLimiterSize();
+        }
+    }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java?rev=753311&r1=753310&r2=753311&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java Fri Mar 13 16:17:58 2009
@@ -22,7 +22,6 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.Connection;
-import org.apache.activemq.broker.openwire.OpenwireBrokerConnection;
 import org.apache.activemq.dispatch.IDispatcher;
 import org.apache.activemq.transport.DispatchableTransportServer;
 import org.apache.activemq.transport.Transport;
@@ -41,7 +40,8 @@
     final HashMap<Destination, Queue> queues = new HashMap<Destination, Queue>();
 
     private TransportServer transportServer;
-    private String uri;
+    private String bindUri;
+    private String connectUri;
     private String name;
     private IDispatcher dispatcher;
     private final AtomicBoolean stopping = new AtomicBoolean();
@@ -72,7 +72,7 @@
 
     public final void start() throws Exception {
         dispatcher.start();
-        transportServer = TransportFactory.bind(new URI(uri));
+        transportServer = TransportFactory.bind(new URI(bindUri));
         transportServer.setAcceptListener(this);
         if (transportServer instanceof DispatchableTransportServer) {
             ((DispatchableTransportServer) transportServer).setDispatcher(dispatcher);
@@ -85,7 +85,7 @@
     }
 
     public void onAccept(final Transport transport) {
-        OpenwireBrokerConnection connection = new OpenwireBrokerConnection();
+        BrokerConnection connection = new BrokerConnection();
         connection.setBroker(this);
         connection.setTransport(transport);
         connection.setPriorityLevels(MAX_PRIORITY);
@@ -115,12 +115,12 @@
         this.dispatcher = dispatcher;
     }
 
-    public String getUri() {
-        return uri;
+    public String getBindUri() {
+        return bindUri;
     }
 
-    public void setUri(String uri) {
-        this.uri = uri;
+    public void setBindUri(String uri) {
+        this.bindUri = uri;
     }
 
     public boolean isStopping() {
@@ -131,4 +131,14 @@
         return router;
     }
 
+
+    public String getConnectUri() {
+        return connectUri;
+    }
+
+
+    public void setConnectUri(String connectUri) {
+        this.connectUri = connectUri;
+    }
+
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java?rev=753311&r1=753310&r2=753311&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java Fri Mar 13 16:17:58 2009
@@ -17,10 +17,22 @@
 package org.apache.activemq.broker;
 
 import org.apache.activemq.Connection;
+import org.apache.activemq.Service;
+import org.apache.activemq.broker.openwire.OpenwireProtocolHandler;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.wireformat.WireFormat;
 
-abstract public class BrokerConnection extends Connection {
+public class BrokerConnection extends Connection {
     
     protected Broker broker;
+    private ProtocolHandler protocolHandler;
+
+    public interface ProtocolHandler extends Service {
+        public void setConnection(BrokerConnection connection);
+        public void onCommand(Object command);
+        public void onException(Exception error);
+        public void setWireFormat(WireFormat wf);
+    }
 
     public Broker getBroker() {
         return broker;
@@ -30,10 +42,52 @@
         this.broker = broker;
     }
     
+    
     @Override
     public boolean isStopping() {
         return super.isStopping() || broker.isStopping();
     }
     
+    public void onCommand(Object command) {
+        if( protocolHandler!=null ) {
+            protocolHandler.onCommand(command);
+        } else {
+            try {
+                WireFormat wf = (WireFormat) command;
+                if( wf.getClass() == OpenWireFormat.class ) {
+                    protocolHandler = new OpenwireProtocolHandler();
+                    protocolHandler.setConnection(this);
+                    protocolHandler.setWireFormat(wf);
+                    protocolHandler.start();
+                }
+            } catch (Exception e) {
+                onException(e);
+            }
+        }
+    }
+    
+    @Override
+    public void onException(Exception error) {
+        if( protocolHandler!=null ) {
+            protocolHandler.onException(error);
+        } else {
+            error.printStackTrace();
+            try {
+                stop();
+            } catch (Exception ignore) {
+            }
+        }
+    }
+    
+    @Override
+    public void stop() throws Exception {
+        super.stop();
+        if( protocolHandler!=null ) {
+            try {
+                protocolHandler.stop();
+            } catch (Exception ignore) {
+            }
+        }
+    }
     
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=753311&r1=753310&r2=753311&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Fri Mar 13 16:17:58 2009
@@ -34,7 +34,7 @@
 
     public Destination getDestination() {
         if( destination == null ) {
-            destination = OpenwireBrokerConnection.convert(message.getDestination());
+            destination = OpenwireProtocolHandler.convert(message.getDestination());
         }
         return destination;
     }

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=753311&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Fri Mar 13 16:17:58 2009
@@ -0,0 +1,565 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.openwire;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+
+import org.apache.activemq.WindowLimiter;
+import org.apache.activemq.broker.BrokerConnection;
+import org.apache.activemq.broker.DeliveryTarget;
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.Router;
+import org.apache.activemq.broker.BrokerConnection.ProtocolHandler;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionControl;
+import org.apache.activemq.command.ConnectionError;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerControl;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.ControlCommand;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.FlushCommand;
+import org.apache.activemq.command.KeepAliveInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.ProducerAck;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.filter.LogicExpression;
+import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.filter.NoLocalExpression;
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.FlowController;
+import org.apache.activemq.flow.IFlowController;
+import org.apache.activemq.flow.IFlowDrain;
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.IFlowSource;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.SizeLimiter;
+import org.apache.activemq.flow.ISinkController.FlowControllable;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.SingleFlowRelay;
+import org.apache.activemq.selector.SelectorParser;
+import org.apache.activemq.state.CommandVisitor;
+import org.apache.activemq.transport.WireFormatNegotiator;
+import org.apache.activemq.wireformat.WireFormat;
+
+public class OpenwireProtocolHandler implements ProtocolHandler {
+
+    protected final HashMap<ProducerId, ProducerContext> producers = new HashMap<ProducerId, ProducerContext>();
+    protected final HashMap<ConsumerId, ConsumerContext> consumers = new HashMap<ConsumerId, ConsumerContext>();
+
+    protected final Object inboundMutex = new Object();
+    protected IFlowController<MessageDelivery> inboundController;
+    
+    protected BrokerConnection connection;
+    private OpenWireFormat wireFormat; 
+    
+    public void start() throws Exception {
+        // Setup the inbound processing..
+        final Flow flow = new Flow("broker-"+connection.getName()+"-inbound", false);
+        SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(connection.getInputWindowSize(), connection.getInputResumeThreshold());
+        inboundController = new FlowController<MessageDelivery>(new FlowControllableAdapter() {
+            public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
+                route(controller, elem);
+            }
+        
+            public String toString() {
+                return flow.getFlowName();
+            }
+        }, flow, limiter, inboundMutex);
+    }
+
+    public void stop() throws Exception {
+    }
+
+    public void onCommand(Object o) {
+        
+        final Command command = (Command) o;
+        boolean responseRequired = command.isResponseRequired();
+        try {
+            command.visit(new CommandVisitor() {
+
+                // /////////////////////////////////////////////////////////////////
+                // Methods that keep track of the client state
+                // /////////////////////////////////////////////////////////////////
+                public Response processAddConnection(ConnectionInfo info) throws Exception {
+                    return ack(command);
+                }
+
+                public Response processAddSession(SessionInfo info) throws Exception {
+                    return ack(command);
+                }
+
+                public Response processAddProducer(ProducerInfo info) throws Exception {
+                    producers.put(info.getProducerId(), new ProducerContext(info));
+                    return ack(command);
+                }
+
+                public Response processAddConsumer(ConsumerInfo info) throws Exception {
+                    ConsumerContext ctx = new ConsumerContext(info);
+                    consumers.put(info.getConsumerId(), ctx);
+                    connection.getBroker().getRouter().bind(convert(info.getDestination()), ctx);
+                    return ack(command);
+                }
+
+                public Response processRemoveConnection(ConnectionId info, long arg1) throws Exception {
+                    return ack(command);
+                }
+
+                public Response processRemoveSession(SessionId info, long arg1) throws Exception {
+                    return ack(command);
+                }
+
+                public Response processRemoveProducer(ProducerId info) throws Exception {
+                    producers.remove(info);
+                    return ack(command);
+                }
+
+                public Response processRemoveConsumer(ConsumerId info, long arg1) throws Exception {
+                    return ack(command);
+                }
+
+                // /////////////////////////////////////////////////////////////////
+                // Message Processing Methods.
+                // /////////////////////////////////////////////////////////////////
+                public Response processMessage(Message info) throws Exception {
+                    ProducerId producerId = info.getProducerId();
+                    ProducerContext producerContext = producers.get(producerId);
+
+                    OpenWireMessageDelivery md = new OpenWireMessageDelivery(info);
+
+                    // Only producers that are not using a window will block,
+                    // and if it blocks.
+                    // yes we block the connection's read thread. yes other
+                    // sessions will not get
+                    // serviced while we block here. The producer is depending
+                    // on TCP flow
+                    // control to slow him down so we have to stop ready from
+                    // the socket at this
+                    // point.
+                    while (!producerContext.controller.offer(md, null)) {
+                        producerContext.controller.waitForFlowUnblock();
+                    }
+                    return null;
+                }
+
+                public Response processMessageAck(MessageAck info) throws Exception {
+                    ConsumerContext ctx = consumers.get(info.getConsumerId());
+                    ctx.ack(info);
+                    return ack(command);
+                }
+
+                // Only used when client prefetch is set to zero.
+                public Response processMessagePull(MessagePull info) throws Exception {
+                    return ack(command);
+                }
+
+                // /////////////////////////////////////////////////////////////////
+                // Control Methods
+                // /////////////////////////////////////////////////////////////////
+                public Response processWireFormat(WireFormatInfo info) throws Exception {
+                    
+                    // Negotiate the openwire encoding options.
+                    WireFormatNegotiator wfn = new WireFormatNegotiator(connection.getTransport(), wireFormat, 1);
+                    wfn.sendWireFormat();
+                    wfn.negociate(info);
+                    
+                    // Now that the encoding is negotiated.. let the client know the details about this
+                    // broker.
+                    BrokerInfo brokerInfo = new BrokerInfo();
+                    brokerInfo.setBrokerId(new BrokerId(connection.getBroker().getName()));
+                    brokerInfo.setBrokerName(connection.getBroker().getName());
+                    brokerInfo.setBrokerURL(connection.getBroker().getBindUri());
+                    connection.write(brokerInfo);
+                    return ack(command);
+                }
+
+                public Response processShutdown(ShutdownInfo info) throws Exception {
+                    return ack(command);
+                }
+
+                public Response processKeepAlive(KeepAliveInfo info) throws Exception {
+                    return ack(command);
+                }
+
+                public Response processFlush(FlushCommand info) throws Exception {
+                    return ack(command);
+                }
+
+                public Response processConnectionControl(ConnectionControl info) throws Exception {
+                    return ack(command);
+                }
+
+                public Response processConnectionError(ConnectionError info) throws Exception {
+                    return ack(command);
+                }
+
+                public Response processConsumerControl(ConsumerControl info) throws Exception {
+                    return ack(command);
+                }
+
+                // /////////////////////////////////////////////////////////////////
+                // Methods for server management
+                // /////////////////////////////////////////////////////////////////
+                public Response processAddDestination(DestinationInfo info) throws Exception {
+                    throw new UnsupportedOperationException();
+                }
+
+                public Response processRemoveDestination(DestinationInfo info) throws Exception {
+                    throw new UnsupportedOperationException();
+                }
+
+                public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
+                    throw new UnsupportedOperationException();
+                }
+
+                public Response processControlCommand(ControlCommand info) throws Exception {
+                    throw new UnsupportedOperationException();
+                }
+
+                // /////////////////////////////////////////////////////////////////
+                // Methods for transaction management
+                // /////////////////////////////////////////////////////////////////
+                public Response processBeginTransaction(TransactionInfo info) throws Exception {
+                    throw new UnsupportedOperationException();
+                }
+
+                public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
+                    throw new UnsupportedOperationException();
+                }
+
+                public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
+                    throw new UnsupportedOperationException();
+                }
+
+                public Response processEndTransaction(TransactionInfo info) throws Exception {
+                    throw new UnsupportedOperationException();
+                }
+
+                public Response processForgetTransaction(TransactionInfo info) throws Exception {
+                    throw new UnsupportedOperationException();
+                }
+
+                public Response processPrepareTransaction(TransactionInfo info) throws Exception {
+                    throw new UnsupportedOperationException();
+                }
+
+                public Response processRecoverTransactions(TransactionInfo info) throws Exception {
+                    throw new UnsupportedOperationException();
+                }
+
+                public Response processRollbackTransaction(TransactionInfo info) throws Exception {
+                    throw new UnsupportedOperationException();
+                }
+
+                // /////////////////////////////////////////////////////////////////
+                // Methods for cluster operations
+                //    These commands are sent to the broker when it's acting like a 
+                //    client to another broker.
+                // /////////////////////////////////////////////////////////////////
+                public Response processBrokerInfo(BrokerInfo info) throws Exception {
+                    throw new UnsupportedOperationException();
+                }
+
+                public Response processMessageDispatch(MessageDispatch info) throws Exception {
+                    throw new UnsupportedOperationException();
+                }
+
+                public Response processMessageDispatchNotification(MessageDispatchNotification info) throws Exception {
+                    throw new UnsupportedOperationException();
+                }
+                
+                public Response processProducerAck(ProducerAck info) throws Exception {
+                    return ack(command);
+                }
+                
+            });
+        } catch (Exception e) {
+            if (responseRequired) {
+                ExceptionResponse response = new ExceptionResponse(e);
+                response.setCorrelationId(command.getCommandId());
+                connection.write(response);
+            } else {
+                connection.onException(e);
+            }
+
+        }
+    }
+    
+    public void onException(Exception error) {
+        if( !connection.isStopping() ) {
+            error.printStackTrace();
+            new Thread(){
+                @Override
+                public void run() {
+                    try {
+                        connection.stop();
+                    } catch (Exception ignore) {
+                    }
+                }
+            }.start();
+        }
+    }
+
+    // /////////////////////////////////////////////////////////////////
+    // Internal Support Methods
+    // /////////////////////////////////////////////////////////////////
+
+    private Response ack(Command command) {
+        if (command.isResponseRequired()) {
+            Response rc = new Response();
+            rc.setCorrelationId(command.getCommandId());
+            connection.write(rc);
+        }
+        return null;
+    }
+
+    static class FlowControllableAdapter implements FlowControllable<MessageDelivery> {
+        public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
+        }
+
+        public IFlowSink<MessageDelivery> getFlowSink() {
+            return null;
+        }
+
+        public IFlowSource<MessageDelivery> getFlowSource() {
+            return null;
+        }
+    }
+    
+    class ProducerContext {
+
+        private IFlowController<MessageDelivery> controller;
+        private String name;
+
+        public ProducerContext(final ProducerInfo info) {
+            this.name = info.getProducerId().toString();
+
+            // Openwire only uses credit windows at the producer level for
+            // producers that request the feature.
+            if (info.getWindowSize() > 0) {
+                final Flow flow = new Flow("broker-"+name+"-inbound", false);
+                WindowLimiter<MessageDelivery> limiter = new WindowLimiter<MessageDelivery>(false, flow, info.getWindowSize(), info.getWindowSize() / 2) {
+                    @Override
+                    protected void sendCredit(int credit) {
+                        ProducerAck ack = new ProducerAck(info.getProducerId(), credit);
+                        connection.write(ack);
+                    }
+                };
+
+                controller = new FlowController<MessageDelivery>(new FlowControllableAdapter() {
+                    public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
+                        route(controller, elem);
+                    }
+
+                    public String toString() {
+                        return flow.getFlowName();
+                    }
+                }, flow, limiter, inboundMutex);
+            } else {
+                controller = inboundController;
+            }
+        }
+    }
+
+    class ConsumerContext implements DeliveryTarget {
+
+        private final ConsumerInfo info;
+        private String name;
+        private BooleanExpression selector;
+
+        private SingleFlowRelay<MessageDelivery> queue;
+        public WindowLimiter<MessageDelivery> limiter;
+
+        public ConsumerContext(final ConsumerInfo info) throws InvalidSelectorException {
+            this.info = info;
+            this.name = info.getConsumerId().toString();
+            selector = parseSelector(info);
+
+            Flow flow = new Flow("broker-"+name+"-outbound", false);
+            limiter = new WindowLimiter<MessageDelivery>(true, flow, info.getPrefetchSize(), info.getPrefetchSize()/2) {
+                public int getElementSize(MessageDelivery m) {
+                    return 1;
+                }
+            };
+            queue = new SingleFlowRelay<MessageDelivery>(flow, flow.getFlowName(), limiter);
+            queue.setDrain(new IFlowDrain<MessageDelivery>() {
+                public void drain(final MessageDelivery message, ISourceController<MessageDelivery> controller) {
+                    Message msg = message.asType(Message.class);
+                    MessageDispatch md = new MessageDispatch();
+                    md.setConsumerId(info.getConsumerId());
+                    md.setMessage(msg);
+                    md.setDestination(msg.getDestination());
+                    connection.write(md);
+                };
+            });
+        }
+
+        public void ack(MessageAck info) {
+            synchronized(queue) {
+                limiter.onProtocolCredit(info.getMessageCount());
+            }
+        }
+
+        public IFlowSink<MessageDelivery> getSink() {
+            return queue;
+        }
+
+        public boolean match(MessageDelivery message) {
+            Message msg = message.asType(Message.class);
+            if (msg == null) {
+                return false;
+            }
+
+            MessageEvaluationContext selectorContext = new MessageEvaluationContext();
+            selectorContext.setMessageReference(msg);
+            selectorContext.setDestination(info.getDestination());
+            try {
+                return (selector == null || selector.matches(selectorContext));
+            } catch (JMSException e) {
+                e.printStackTrace();
+                return false;
+            }
+        }
+
+    }
+
+    protected void route(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
+        // TODO:
+        // Consider doing some caching of this target list. Most producers
+        // always send to
+        // the same destination.
+        Collection<DeliveryTarget> targets = connection.getBroker().getRouter().route(elem);
+
+        final Message message = ((OpenWireMessageDelivery) elem).getMessage();
+        if (targets != null) {
+
+            if (message.isResponseRequired()) {
+                // We need to ack the message once we ensure we won't loose it.
+                // We know we won't loose it once it's persisted or delivered to
+                // a consumer
+                // Setup a callback to get notifed once one of those happens.
+                if (message.isPersistent()) {
+                    elem.setCompletionCallback(new Runnable() {
+                        public void run() {
+                            ack(message);
+                        }
+                    });
+                } else {
+                    // Let the client know the broker got the message.
+                    ack(message);
+                }
+            }
+
+            // Deliver the message to all the targets..
+            for (DeliveryTarget dt : targets) {
+                if (dt.match(elem)) {
+                    dt.getSink().add(elem, controller);
+                }
+            }
+
+        } else {
+            // Let the client know we got the message even though there
+            // were no valid targets to deliver the message to.
+            if (message.isResponseRequired()) {
+                ack(message);
+            }
+        }
+        controller.elementDispatched(elem);
+    }
+
+    static public Destination convert(ActiveMQDestination dest) {
+        if (dest.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = dest.getCompositeDestinations();
+            ArrayList<Destination> d = new ArrayList<Destination>();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                d.add(convert(compositeDestinations[i]));
+            }
+            return new Destination.MultiDestination(d);
+        }
+        AsciiBuffer domain;
+        if (dest.isQueue()) {
+            domain = Router.QUEUE_DOMAIN;
+        }
+        if (dest.isTopic()) {
+            domain = Router.TOPIC_DOMAIN;
+        } else {
+            throw new IllegalArgumentException("Unsupported domain type: " + dest);
+        }
+        return new Destination.SingleDestination(domain, new AsciiBuffer(dest.getPhysicalName()));
+    }
+    
+    private static BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException {
+        BooleanExpression rc = null;
+        if (info.getSelector() != null) {
+            rc = SelectorParser.parse(info.getSelector());
+        }
+        if (info.isNoLocal()) {
+            if (rc == null) {
+                rc = new NoLocalExpression(info.getConsumerId().getConnectionId());
+            } else {
+                rc = LogicExpression.createAND(new NoLocalExpression(info.getConsumerId().getConnectionId()), rc);
+            }
+        }
+        if (info.getAdditionalPredicate() != null) {
+            if (rc == null) {
+                rc = info.getAdditionalPredicate();
+            } else {
+                rc = LogicExpression.createAND(info.getAdditionalPredicate(), rc);
+            }
+        }
+        return rc;
+    }
+
+    public BrokerConnection getConnection() {
+        return connection;
+    }
+
+    public void setConnection(BrokerConnection connection) {
+        this.connection = connection;
+    }
+
+    public void setWireFormat(WireFormat wireFormat) {
+        this.wireFormat = (OpenWireFormat) wireFormat;
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/ConcatInputStream.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/ConcatInputStream.java?rev=753311&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/ConcatInputStream.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/ConcatInputStream.java Fri Mar 13 16:17:58 2009
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.wireformat;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class ConcatInputStream extends InputStream {
+
+    private InputStream first;
+    private final InputStream second;
+
+    public ConcatInputStream(InputStream first, InputStream second) {
+        this.first = first;
+        this.second = second;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if( first!=null ) {
+            int rc = first.read();
+            if( rc >= 0 ) {
+                return rc;
+            }
+            first = null;
+        }
+        return second.read();
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableOpenWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableOpenWireFormatFactory.java?rev=753311&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableOpenWireFormatFactory.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableOpenWireFormatFactory.java Fri Mar 13 16:17:58 2009
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.wireformat;
+
+import org.apache.activemq.openwire.OpenWireFormatFactory;
+import org.apache.activemq.util.ByteSequence;
+
+public class DiscriminatableOpenWireFormatFactory extends OpenWireFormatFactory implements DiscriminatableWireFormatFactory {
+
+    private static final byte MAGIC[] = new byte[] {1, 'A', 'c', 't', 'i', 'v', 'e', 'M', 'Q'};
+
+    public boolean matchesWireformatHeader(ByteSequence byteSequence) {
+        if( byteSequence.length == 4+MAGIC.length ) {
+            for( int i=0; i < MAGIC.length; i++ ) {
+                if( byteSequence.data[i+4] != MAGIC[i] ) {
+                    return false;
+                }
+            }
+            return true;
+        }
+        return false;
+    }
+
+    public int maxWireformatHeaderLength() {
+        return 4+MAGIC.length;
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java?rev=753311&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java Fri Mar 13 16:17:58 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.wireformat;
+
+import org.apache.activemq.transport.stomp.StompWireFormatFactory;
+import org.apache.activemq.util.ByteSequence;
+
+public class DiscriminatableStompWireFormatFactory extends StompWireFormatFactory implements DiscriminatableWireFormatFactory {
+
+    public boolean matchesWireformatHeader(ByteSequence byteSequence) {
+        return false;
+    }
+
+    public int maxWireformatHeaderLength() {
+        return 100;
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableWireFormatFactory.java?rev=753311&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableWireFormatFactory.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableWireFormatFactory.java Fri Mar 13 16:17:58 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.wireformat;
+
+import org.apache.activemq.util.ByteSequence;
+
+/**
+ * This should actually get merged into the WireFormatFactory class.  But to avoid change to much in the core right,
+ * now it's an additional interface. 
+ * 
+ */
+public interface DiscriminatableWireFormatFactory extends WireFormatFactory {
+
+    int maxWireformatHeaderLength();
+
+    boolean matchesWireformatHeader(ByteSequence byteSequence);
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java?rev=753311&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java Fri Mar 13 16:17:58 2009
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.wireformat;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+
+public class MultiWireFormatFactory implements WireFormatFactory{
+
+    static class MultiWireFormat implements WireFormat {
+
+        ArrayList<DiscriminatableWireFormatFactory> wireFormatFactories = new ArrayList<DiscriminatableWireFormatFactory>();
+        WireFormat wireFormat;
+        int maxHeaderLength;
+        
+        public int getVersion() {
+            return 0;
+        }
+        public boolean inReceive() {
+            return wireFormat.inReceive();
+        }
+        public void setVersion(int version) {
+            wireFormat.setVersion(version);
+        }
+
+        private ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        private ByteArrayInputStream peeked;
+        
+        public Object unmarshal(DataInput in) throws IOException {
+
+            while( wireFormat == null ) {
+                
+                int readByte = ((InputStream)in).read();
+                if( readByte < 0 ) {
+                    throw new EOFException();
+                }
+                baos.write(readByte);
+                
+                // Try to discriminate what we have read so far.
+                for (DiscriminatableWireFormatFactory wff : wireFormatFactories) {
+                    if( wff.matchesWireformatHeader(baos.toByteSequence()) ) {
+                        wireFormat = wff.createWireFormat();
+                        peeked = new ByteArrayInputStream(baos.toByteSequence());
+                        return wireFormat;
+                    }
+                }
+                
+                if( baos.size() >= maxHeaderLength ) {
+                    throw new IOException("Could not discriminate the protocol.");
+                }
+            }
+            
+            // If we have some peeked data we need to feed that back..  Only happens
+            // for the first few bytes of the protocol header.
+            if( peeked!=null ) {
+                in = new DataInputStream( new ConcatInputStream(peeked, (InputStream)in) );
+                Object rc = wireFormat.unmarshal(in);
+                if( peeked.available() <= 0 ) {
+                    peeked=null;
+                }
+                return rc;
+            }
+
+            return wireFormat.unmarshal(in);
+        }
+
+        
+        public void marshal(Object command, DataOutput out) throws IOException {
+            wireFormat.marshal(command, out);
+        }
+
+        public ByteSequence marshal(Object command) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+        public Object unmarshal(ByteSequence packet) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+        public ArrayList<DiscriminatableWireFormatFactory> getWireFormatFactories() {
+            return wireFormatFactories;
+        }
+        public void setWireFormatFactories(ArrayList<DiscriminatableWireFormatFactory> wireFormatFactories) {
+            this.wireFormatFactories = wireFormatFactories;
+            maxHeaderLength=0;
+            for (DiscriminatableWireFormatFactory wff : wireFormatFactories) {
+                maxHeaderLength = Math.max( maxHeaderLength, wff.maxWireformatHeaderLength());
+            }
+        }
+    }
+        
+    public WireFormat createWireFormat() {
+        MultiWireFormat rc = new MultiWireFormat();
+        ArrayList<DiscriminatableWireFormatFactory> wireFormatFactories = new ArrayList<DiscriminatableWireFormatFactory>();
+        wireFormatFactories.add(new DiscriminatableStompWireFormatFactory());
+        wireFormatFactories.add(new DiscriminatableOpenWireFormatFactory());
+        rc.setWireFormatFactories(wireFormatFactories);
+        return rc;
+    }
+
+}

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java?rev=753311&r1=753310&r2=753311&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java Fri Mar 13 16:17:58 2009
@@ -59,8 +59,10 @@
     // set to force marshalling even in the NON tcp case.
     protected boolean forceMarshalling = false;
 
-    protected String sendBrokerURI;
-    protected String receiveBrokerURI;
+    protected String sendBrokerBindURI;
+    protected String receiveBrokerBindURI;
+    protected String sendBrokerConnectURI;
+    protected String receiveBrokerConnectURI;
 
     // Set's the number of threads to use:
     protected final int asyncThreadPoolSize = Runtime.getRuntime().availableProcessors();
@@ -101,16 +103,20 @@
         dispatcher = createDispatcher();
         dispatcher.start();
         if (tcp) {
-            sendBrokerURI = "tcp://localhost:10000";
-            receiveBrokerURI = "tcp://localhost:20000";
+            sendBrokerBindURI = "tcp://localhost:10000?wireFormat=multi";
+            receiveBrokerBindURI = "tcp://localhost:20000?wireFormat=multi";
+            sendBrokerConnectURI = "tcp://localhost:10000";
+            receiveBrokerConnectURI = "tcp://localhost:20000";
         } else {
             if (forceMarshalling) {
-                sendBrokerURI = "pipe://SendBroker";
-                receiveBrokerURI = "pipe://ReceiveBroker";
+                sendBrokerBindURI = "pipe://SendBroker";
+                receiveBrokerBindURI = "pipe://ReceiveBroker";
             } else {
-                sendBrokerURI = "pipe://SendBroker";
-                receiveBrokerURI = "pipe://ReceiveBroker";
+                sendBrokerBindURI = "pipe://SendBroker";
+                receiveBrokerBindURI = "pipe://ReceiveBroker";
             }
+            sendBrokerConnectURI = sendBrokerBindURI;
+            receiveBrokerConnectURI = receiveBrokerBindURI;
         }
     }
 
@@ -370,12 +376,12 @@
     private void createConnections() throws IOException, URISyntaxException {
 
         if (multibroker) {
-            sendBroker = createBroker("SendBroker", sendBrokerURI);
-            rcvBroker = createBroker("RcvBroker", receiveBrokerURI);
+            sendBroker = createBroker("SendBroker", sendBrokerBindURI, sendBrokerConnectURI);
+            rcvBroker = createBroker("RcvBroker", receiveBrokerBindURI, receiveBrokerConnectURI);
             brokers.add(sendBroker);
             brokers.add(rcvBroker);
         } else {
-            sendBroker = rcvBroker = createBroker("Broker", sendBrokerURI);
+            sendBroker = rcvBroker = createBroker("Broker", sendBrokerBindURI, sendBrokerConnectURI);
             brokers.add(sendBroker);
         }
 
@@ -425,7 +431,7 @@
                 }
             }
         };
-        consumer.setUri(new URI(rcvBroker.getUri()));
+        consumer.setUri(new URI(rcvBroker.getConnectUri()));
         consumer.setDestination(destination);
         consumer.setName("consumer" + (i + 1));
         consumer.setTotalConsumerRate(totalConsumerRate);
@@ -442,7 +448,7 @@
                 }
             }
         };
-        producer.setUri(new URI(sendBroker.getUri()));
+        producer.setUri(new URI(sendBroker.getConnectUri()));
         producer.setProducerId(id + 1);
         producer.setName("producer" + (id + 1));
         producer.setDestination(destination);
@@ -463,10 +469,11 @@
         return queue;
     }
 
-    private Broker createBroker(String name, String uri) {
+    private Broker createBroker(String name, String bindURI, String connectUri) {
         Broker broker = new Broker();
         broker.setName(name);
-        broker.setUri(uri);
+        broker.setBindUri(bindURI);
+        broker.setConnectUri(connectUri);
         broker.setDispatcher(dispatcher);
         return broker;
     }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java?rev=753311&r1=753310&r2=753311&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java Fri Mar 13 16:17:58 2009
@@ -8,6 +8,7 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.Connection;
+import org.apache.activemq.WindowLimiter;
 import org.apache.activemq.broker.Destination;
 import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.broker.Router;

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java?rev=753311&r1=753310&r2=753311&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java Fri Mar 13 16:17:58 2009
@@ -11,6 +11,7 @@
 import javax.jms.JMSException;
 
 import org.apache.activemq.Connection;
+import org.apache.activemq.WindowLimiter;
 import org.apache.activemq.broker.Destination;
 import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.broker.Router;

Added: activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/multi
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/multi?rev=753311&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/multi (added)
+++ activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/multi Fri Mar 13 16:17:58 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.wireformat.MultiWireFormatFactory