You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/12 21:18:26 UTC

svn commit: r752995 - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/openwire/ test/java/org/apache/activemq/broker/openwire/ test/java/org/apache/active...

Author: chirino
Date: Thu Mar 12 20:18:25 2009
New Revision: 752995

URL: http://svn.apache.org/viewvc?rev=752995&view=rev
Log:
Producer actually sends the broker messages now

Added:
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/Openwire2Support.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java
Removed:
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/MockBrokerTest.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenWireSupport.java
Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java?rev=752995&r1=752994&r2=752995&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java Thu Mar 12 20:18:25 2009
@@ -2,44 +2,33 @@
 
 import java.io.IOException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.activemq.broker.DeliveryTarget;
 import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.dispatch.IDispatcher;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.IFlowLimiter;
-import org.apache.activemq.flow.IFlowRelay;
-import org.apache.activemq.flow.IFlowSink;
-import org.apache.activemq.flow.Message;
 import org.apache.activemq.flow.SizeLimiter;
-import org.apache.activemq.queue.SingleFlowRelay;
 import org.apache.activemq.transport.DispatchableTransport;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportListener;
 
-abstract public class Connection implements TransportListener, DeliveryTarget {
+abstract public class Connection implements TransportListener {
 
     protected Transport transport;
-
     protected String name;
 
     private int priorityLevels;
-
     protected final int outputWindowSize = 1000;
     protected final int outputResumeThreshold = 900;
-
     protected final int inputWindowSize = 1000;
     protected final int inputResumeThreshold = 500;
     
-    protected IFlowRelay<MessageDelivery> outputQueue;
-
     private IDispatcher dispatcher;
     private final AtomicBoolean stopping = new AtomicBoolean();
-    protected Flow outputFlow;
-    protected boolean blockingTransport = false;
-    protected ExecutorService blockingWriter;
+    private  ExecutorService blockingWriter;
 
     public void setTransport(Transport transport) {
         this.transport = transport;
@@ -47,6 +36,15 @@
 
     public void start() throws Exception {
         transport.setTransportListener(this);
+        if (transport instanceof DispatchableTransport) {
+            DispatchableTransport dt = ((DispatchableTransport) transport);
+            if (name != null) {
+                dt.setName(name);
+            }
+            dt.setDispatcher(getDispatcher());
+        } else {
+            blockingWriter = Executors.newSingleThreadExecutor();
+        }
         transport.start();
     }
 
@@ -64,8 +62,8 @@
     }
     
     protected final void write(final Object o) {
-        synchronized (outputQueue) {
-            if (!blockingTransport) {
+        synchronized (transport) {
+            if (blockingWriter==null) {
                 try {
                     transport.oneway(o);
                 } catch (IOException e) {
@@ -130,13 +128,6 @@
 
     public void setDispatcher(IDispatcher dispatcher) {
         this.dispatcher = dispatcher;
-        if (transport instanceof DispatchableTransport) {
-            DispatchableTransport dt = ((DispatchableTransport) transport);
-            if (name != null) {
-                dt.setName(name);
-            }
-            dt.setDispatcher(getDispatcher());
-        }
     }
 
     public int getOutputWindowSize() {
@@ -155,14 +146,6 @@
         return inputResumeThreshold;
     }
 
-    public IFlowRelay<MessageDelivery> getSink() {
-        return outputQueue;
-    }
-
-    public boolean match(MessageDelivery message) {
-        return true;
-    }
-
     protected interface ProtocolLimiter<E> extends IFlowLimiter<E> {
         public void onProtocolCredit(int credit);
     }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java?rev=752995&r1=752994&r2=752995&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java Thu Mar 12 20:18:25 2009
@@ -109,10 +109,6 @@
         this.uri = uri;
     }
 
-    public URI getConnectURI() {
-        return transportServer.getConnectURI();
-    }
-
     public boolean isStopping() {
         return stopping.get();
     }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java?rev=752995&r1=752994&r2=752995&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java Thu Mar 12 20:18:25 2009
@@ -50,26 +50,26 @@
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.IFlowController;
+import org.apache.activemq.flow.IFlowDrain;
 import org.apache.activemq.flow.IFlowSink;
 import org.apache.activemq.flow.IFlowSource;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.SizeLimiter;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
 import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.SingleFlowRelay;
 import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.state.CommandVisitor;
+import org.apache.activemq.transport.DispatchableTransport;
 
 public class OpenwireBrokerConnection extends BrokerConnection {
-    
+
     protected final HashMap<ProducerId, ProducerContext> producers = new HashMap<ProducerId, ProducerContext>();
     protected final HashMap<ConsumerId, ConsumerContext> consumers = new HashMap<ConsumerId, ConsumerContext>();
-    
+
     protected final Object inboundMutex = new Object();
     protected IFlowController<MessageDelivery> inboundController;
-
-    protected IFlowController<MessageDelivery> outboundController;
-//    public ProtocolLimiter<MessageDelivery> outboundLimiter;
-    protected Flow ouboundFlow;
+//    private SingleFlowRelay<MessageDelivery> outboundQueue;
 
     public void onCommand(Object o) {
         final Command command = (Command) o;
@@ -77,152 +77,185 @@
         try {
             command.visit(new CommandVisitor() {
 
-                ///////////////////////////////////////////////////////////////////
+                // /////////////////////////////////////////////////////////////////
                 // Methods that keep track of the client state
-                ///////////////////////////////////////////////////////////////////
+                // /////////////////////////////////////////////////////////////////
                 public Response processAddConnection(ConnectionInfo info) throws Exception {
                     return ack(command);
                 }
+
                 public Response processAddSession(SessionInfo info) throws Exception {
                     return ack(command);
                 }
+
                 public Response processAddProducer(ProducerInfo info) throws Exception {
                     producers.put(info.getProducerId(), new ProducerContext(info));
                     return ack(command);
                 }
+
                 public Response processAddConsumer(ConsumerInfo info) throws Exception {
                     ConsumerContext ctx = new ConsumerContext(info);
                     consumers.put(info.getConsumerId(), ctx);
-                    
                     broker.getRouter().bind(convert(info.getDestination()), ctx);
-                    
-                    
                     return ack(command);
                 }
+
                 public Response processRemoveConnection(ConnectionId info, long arg1) throws Exception {
                     return ack(command);
                 }
+
                 public Response processRemoveSession(SessionId info, long arg1) throws Exception {
                     return ack(command);
                 }
+
                 public Response processRemoveProducer(ProducerId info) throws Exception {
                     producers.remove(info);
                     return ack(command);
                 }
+
                 public Response processRemoveConsumer(ConsumerId info, long arg1) throws Exception {
                     return ack(command);
                 }
 
-                ///////////////////////////////////////////////////////////////////
+                // /////////////////////////////////////////////////////////////////
                 // Message Processing Methods.
-                ///////////////////////////////////////////////////////////////////
+                // /////////////////////////////////////////////////////////////////
                 public Response processMessage(Message info) throws Exception {
                     ProducerId producerId = info.getProducerId();
                     ProducerContext producerContext = producers.get(producerId);
-                    
+
                     OpenWireMessageDelivery md = new OpenWireMessageDelivery(info);
-                    
-                    // Only producers that are not using a window will block, and if it blocks.
-                    // yes we block the connection's read thread.  yes other sessions will not get
-                    // serviced while we block here.  The producer is depending on TCP flow 
-                    // control to slow him down so we have to stop ready from the socket at this 
+
+                    // Only producers that are not using a window will block,
+                    // and if it blocks.
+                    // yes we block the connection's read thread. yes other
+                    // sessions will not get
+                    // serviced while we block here. The producer is depending
+                    // on TCP flow
+                    // control to slow him down so we have to stop ready from
+                    // the socket at this
                     // point.
-                    while( !producerContext.controller.offer(md, null) ) {
+                    while (!producerContext.controller.offer(md, null)) {
                         producerContext.controller.waitForFlowUnblock();
                     }
                     return null;
                 }
+
                 public Response processMessageAck(MessageAck info) throws Exception {
                     return ack(command);
                 }
+
+                // Only used when client prefetch is set to zero.
                 public Response processMessagePull(MessagePull info) throws Exception {
                     return ack(command);
                 }
-                public Response processProducerAck(ProducerAck info) throws Exception {
-                    return ack(command);
-                }
 
-                ///////////////////////////////////////////////////////////////////
+                // /////////////////////////////////////////////////////////////////
                 // Control Methods
-                ///////////////////////////////////////////////////////////////////
+                // /////////////////////////////////////////////////////////////////
                 public Response processWireFormat(WireFormatInfo info) throws Exception {
                     return ack(command);
                 }
+
                 public Response processShutdown(ShutdownInfo info) throws Exception {
                     return ack(command);
                 }
+
                 public Response processKeepAlive(KeepAliveInfo info) throws Exception {
                     return ack(command);
                 }
+
                 public Response processFlush(FlushCommand info) throws Exception {
                     return ack(command);
                 }
+
                 public Response processConnectionControl(ConnectionControl info) throws Exception {
                     return ack(command);
                 }
+
                 public Response processConnectionError(ConnectionError info) throws Exception {
                     return ack(command);
                 }
+
                 public Response processConsumerControl(ConsumerControl info) throws Exception {
                     return ack(command);
                 }
 
-                ///////////////////////////////////////////////////////////////////
+                // /////////////////////////////////////////////////////////////////
                 // Methods for server management
-                ///////////////////////////////////////////////////////////////////
+                // /////////////////////////////////////////////////////////////////
                 public Response processAddDestination(DestinationInfo info) throws Exception {
                     throw new UnsupportedOperationException();
                 }
+
                 public Response processRemoveDestination(DestinationInfo info) throws Exception {
                     throw new UnsupportedOperationException();
                 }
+
                 public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
                     throw new UnsupportedOperationException();
                 }
+
                 public Response processControlCommand(ControlCommand info) throws Exception {
                     throw new UnsupportedOperationException();
                 }
 
-                ///////////////////////////////////////////////////////////////////
+                // /////////////////////////////////////////////////////////////////
                 // Methods for transaction management
-                ///////////////////////////////////////////////////////////////////
+                // /////////////////////////////////////////////////////////////////
                 public Response processBeginTransaction(TransactionInfo info) throws Exception {
                     throw new UnsupportedOperationException();
                 }
+
                 public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
                     throw new UnsupportedOperationException();
                 }
+
                 public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
                     throw new UnsupportedOperationException();
                 }
+
                 public Response processEndTransaction(TransactionInfo info) throws Exception {
                     throw new UnsupportedOperationException();
                 }
+
                 public Response processForgetTransaction(TransactionInfo info) throws Exception {
                     throw new UnsupportedOperationException();
                 }
+
                 public Response processPrepareTransaction(TransactionInfo info) throws Exception {
                     throw new UnsupportedOperationException();
                 }
+
                 public Response processRecoverTransactions(TransactionInfo info) throws Exception {
                     throw new UnsupportedOperationException();
                 }
+
                 public Response processRollbackTransaction(TransactionInfo info) throws Exception {
                     throw new UnsupportedOperationException();
                 }
 
-                ///////////////////////////////////////////////////////////////////
+                // /////////////////////////////////////////////////////////////////
                 // Methods for cluster operations
-                ///////////////////////////////////////////////////////////////////
+                //    These commands are sent to the broker when it's acting like a 
+                //    client to another broker.
+                // /////////////////////////////////////////////////////////////////
                 public Response processBrokerInfo(BrokerInfo info) throws Exception {
                     throw new UnsupportedOperationException();
                 }
+
                 public Response processMessageDispatch(MessageDispatch info) throws Exception {
                     throw new UnsupportedOperationException();
                 }
+
                 public Response processMessageDispatchNotification(MessageDispatchNotification info) throws Exception {
                     throw new UnsupportedOperationException();
                 }
+                
+                public Response processProducerAck(ProducerAck info) throws Exception {
+                    return ack(command);
+                }
+                
             });
         } catch (Exception e) {
             if (responseRequired) {
@@ -232,34 +265,33 @@
             } else {
                 onException(e);
             }
-            
+
         }
     }
 
-
-    ///////////////////////////////////////////////////////////////////
+    // /////////////////////////////////////////////////////////////////
     // Internal Support Methods
-    ///////////////////////////////////////////////////////////////////
+    // /////////////////////////////////////////////////////////////////
 
     private Response ack(Command command) {
-        Response  rc = null;
-        if( command.isResponseRequired() ) {
-            rc = new Response();
+        if (command.isResponseRequired()) {
+            Response rc = new Response();
             rc.setCorrelationId(command.getCommandId());
+            write(rc);
         }
-        return rc;
+        return null;
     }
-    
+
     @Override
     public void start() throws Exception {
         super.start();
         BrokerInfo info = new BrokerInfo();
         info.setBrokerId(new BrokerId(broker.getName()));
         info.setBrokerName(broker.getName());
-        info.setBrokerURL(broker.getConnectURI().toString());
+        info.setBrokerURL(broker.getUri());
         write(info);
     }
-    
+
     static class FlowControllableAdapter implements FlowControllable<MessageDelivery> {
         public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
         }
@@ -273,17 +305,78 @@
         }
     }
     
-    
+    class ProducerContext {
+
+        private final ProducerInfo info;
+        private IFlowController<MessageDelivery> controller;
+        private String name;
+
+        public ProducerContext(final ProducerInfo info) {
+            this.info = info;
+            this.name = info.getProducerId().toString();
+
+            // Openwire only uses credit windows at the producer level for
+            // producers that request the feature.
+            if (info.getWindowSize() > 0) {
+                Flow flow = new Flow(info.getProducerId().toString(), false);
+                WindowLimiter<MessageDelivery> limiter = new WindowLimiter<MessageDelivery>(false, flow, info.getWindowSize(), info.getWindowSize() / 2) {
+                    @Override
+                    protected void sendCredit(int credit) {
+                        ProducerAck ack = new ProducerAck(info.getProducerId(), credit);
+                        write(ack);
+                    }
+                };
+
+                controller = new FlowController<MessageDelivery>(new FlowControllableAdapter() {
+                    public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
+                        route(controller, elem);
+                    }
+
+                    public String toString() {
+                        return name;
+                    }
+                }, flow, limiter, inboundMutex);
+            } else {
+                controller = inboundController;
+            }
+        }
+    }
+
     class ConsumerContext implements DeliveryTarget {
-        
+
         private final ConsumerInfo info;
         private String name;
         private BooleanExpression selector;
 
+        private SingleFlowRelay<MessageDelivery> queue;
+        public ProtocolLimiter<MessageDelivery> limiter;
+
         public ConsumerContext(final ConsumerInfo info) throws InvalidSelectorException {
             this.info = info;
             this.name = info.getConsumerId().toString();
             selector = parseSelector(info);
+
+            Flow flow = new Flow(name, false);
+            limiter = new WindowLimiter<MessageDelivery>(true, flow, info.getPrefetchSize(), info.getPrefetchSize() / 2) {
+                public int getElementSize(MessageDelivery m) {
+                    return 1;
+                }
+            };
+            queue = new SingleFlowRelay<MessageDelivery>(flow, name + "-outbound", limiter);
+            if (transport instanceof DispatchableTransport) {
+                queue.setDrain(new IFlowDrain<MessageDelivery>() {
+                    public void drain(MessageDelivery message, ISourceController<MessageDelivery> controller) {
+                        write(message);
+                    }
+                });
+
+            } else {
+                queue.setDrain(new IFlowDrain<MessageDelivery>() {
+                    public void drain(final MessageDelivery message, ISourceController<MessageDelivery> controller) {
+                        write(message);
+                    };
+                });
+            }
         }
 
         public IFlowSink<MessageDelivery> getSink() {
@@ -293,10 +386,10 @@
 
         public boolean match(MessageDelivery message) {
             Message msg = message.asType(Message.class);
-            if( msg ==null ) {
+            if (msg == null) {
                 return false;
             }
-            
+
             MessageEvaluationContext selectorContext = new MessageEvaluationContext();
             selectorContext.setMessageReference(msg);
             selectorContext.setDestination(info.getDestination());
@@ -307,82 +400,26 @@
                 return false;
             }
         }
-        
-    }
 
-    private static BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException {
-        BooleanExpression rc = null;
-        if (info.getSelector() != null) {
-            rc = SelectorParser.parse(info.getSelector());
-        }
-        if (info.isNoLocal()) {
-            if (rc == null) {
-                rc = new NoLocalExpression(info.getConsumerId().getConnectionId());
-            } else {
-                rc = LogicExpression.createAND(new NoLocalExpression(info.getConsumerId().getConnectionId()), rc);
-            }
-        }
-        if (info.getAdditionalPredicate() != null) {
-            if (rc == null) {
-                rc = info.getAdditionalPredicate();
-            } else {
-                rc = LogicExpression.createAND(info.getAdditionalPredicate(), rc);
-            }
-        }
-        return rc;
     }
 
-    class ProducerContext {
-        
-        private final ProducerInfo info;
-        private IFlowController<MessageDelivery> controller;
-        private String name;
-
-        public ProducerContext(final ProducerInfo info) {
-            this.info = info;
-            this.name = info.getProducerId().toString();
-            
-            // Openwire only uses credit windows at the producer level for producers that request the feature.
-            if( info.getWindowSize() > 0 ) {
-                Flow flow = new Flow(info.getProducerId().toString(), false);
-                
-                WindowLimiter<MessageDelivery> limiter = new WindowLimiter<MessageDelivery>(false, flow, info.getWindowSize(), info.getWindowSize()/2) {
-                    @Override
-                    protected void sendCredit(int credit) {
-                        ProducerAck ack = new ProducerAck(info.getProducerId(), credit);
-                        write(ack);
-                    }
-                };
-                
-                controller = new FlowController<MessageDelivery>(new FlowControllableAdapter() {
-                    public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
-                        route(controller, elem);
-                    }
-                    public String toString() {
-                        return name;
-                    }
-                }, flow, limiter, inboundMutex);
-            } else {
-                controller = inboundController;
-            }
-        }
-    }
-    
     protected void route(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
         // TODO:
-        // Consider doing some caching of this target list. Most producers always send to 
+        // Consider doing some caching of this target list. Most producers
+        // always send to
         // the same destination.
         Collection<DeliveryTarget> targets = broker.getRouter().route(elem);
-        
-        final Message message = ((OpenWireMessageDelivery)elem).getMessage();
-        if( targets != null ) { 
-            
-            if( message.isResponseRequired() ) {
+
+        final Message message = ((OpenWireMessageDelivery) elem).getMessage();
+        if (targets != null) {
+
+            if (message.isResponseRequired()) {
                 // We need to ack the message once we ensure we won't loose it.
-                // We know we won't loose it once it's persisted or delivered to a consumer
+                // We know we won't loose it once it's persisted or delivered to
+                // a consumer
                 // Setup a callback to get notifed once one of those happens.
-                if( message.isPersistent() ) {
-                    elem.setCompletionCallback(new Runnable(){
+                if (message.isPersistent()) {
+                    elem.setCompletionCallback(new Runnable() {
                         public void run() {
                             ack(message);
                         }
@@ -392,81 +429,90 @@
                     ack(message);
                 }
             }
-            
+
             // Deliver the message to all the targets..
             for (DeliveryTarget dt : targets) {
                 if (dt.match(elem)) {
                     dt.getSink().add(elem, controller);
                 }
             }
-            
+
         } else {
-            // Let the client know we got the message even though there 
+            // Let the client know we got the message even though there
             // were no valid targets to deliver the message to.
-            if( message.isResponseRequired() ) {
+            if (message.isResponseRequired()) {
                 ack(message);
             }
         }
         controller.elementDispatched(elem);
     }
-    
+
     protected void initialize() {
-        
-        // Setup the input processing..
+
+        // Setup the inbound processing..
         Flow flow = new Flow(name, false);
         SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(inputWindowSize, inputResumeThreshold);
         inboundController = new FlowController<MessageDelivery>(new FlowControllableAdapter() {
             public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
                 route(controller, elem);
             }
+
             public String toString() {
                 return name;
             }
         }, flow, limiter, inboundMutex);
 
-//        ouboundFlow = new Flow(name, false);
-//        outboundLimiter = new WindowLimiter<MessageDelivery>(true, ouboundFlow, outputWindowSize, outputResumeThreshold);
-//        outputQueue = new SingleFlowRelay<MessageDelivery>(ouboundFlow, name + "-outbound", outboundLimiter);
-//        outboundController = outputQueue.getFlowController(ouboundFlow);
-//
-//        if (transport instanceof DispatchableTransport) {
-//            outputQueue.setDrain(new IFlowDrain<MessageDelivery>() {
-//
-//                public void drain(MessageDelivery message, ISourceController<MessageDelivery> controller) {
-//                    write(message);
-//                }
-//            });
-//
-//        } else {
-//            blockingTransport = true;
-//            blockingWriter = Executors.newSingleThreadExecutor();
-//            outputQueue.setDrain(new IFlowDrain<MessageDelivery>() {
-//                public void drain(final MessageDelivery message, ISourceController<MessageDelivery> controller) {
-//                    write(message);
-//                };
-//            });
-//
-//        }
+//        Flow ouboundFlow = new Flow(name, false);
+//        SizeLimiter<MessageDelivery> outboundLimiter = new SizeLimiter<MessageDelivery>(outputWindowSize, outputResumeThreshold);
+//        outboundQueue = new SingleFlowRelay<MessageDelivery>(ouboundFlow, name + "-outbound", outboundLimiter);
+//        outboundQueue.setDrain(new IFlowDrain<MessageDelivery>() {
+//            public void drain(final MessageDelivery message, ISourceController<MessageDelivery> controller) {
+//                write(message);
+//            };
+//        });
     }
-    
+
     static public Destination convert(ActiveMQDestination dest) {
-        if( dest.isComposite() ) {
+        if (dest.isComposite()) {
             ActiveMQDestination[] compositeDestinations = dest.getCompositeDestinations();
-            ArrayList<Destination> d= new ArrayList<Destination>();
+            ArrayList<Destination> d = new ArrayList<Destination>();
             for (int i = 0; i < compositeDestinations.length; i++) {
                 d.add(convert(compositeDestinations[i]));
             }
             return new Destination.MultiDestination(d);
-        } 
+        }
         AsciiBuffer domain;
-        if( dest.isQueue() ) {
+        if (dest.isQueue()) {
             domain = Router.QUEUE_DOMAIN;
-        } if( dest.isTopic() ) {
+        }
+        if (dest.isTopic()) {
             domain = Router.TOPIC_DOMAIN;
         } else {
-            throw new IllegalArgumentException("Unsupported domain type: "+ dest);
+            throw new IllegalArgumentException("Unsupported domain type: " + dest);
         }
-        return new Destination.SingleDestination(domain, new AsciiBuffer(dest.getPhysicalName())); 
+        return new Destination.SingleDestination(domain, new AsciiBuffer(dest.getPhysicalName()));
+    }
+    
+    private static BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException {
+        BooleanExpression rc = null;
+        if (info.getSelector() != null) {
+            rc = SelectorParser.parse(info.getSelector());
+        }
+        if (info.isNoLocal()) {
+            if (rc == null) {
+                rc = new NoLocalExpression(info.getConsumerId().getConnectionId());
+            } else {
+                rc = LogicExpression.createAND(new NoLocalExpression(info.getConsumerId().getConnectionId()), rc);
+            }
+        }
+        if (info.getAdditionalPredicate() != null) {
+            if (rc == null) {
+                rc = info.getAdditionalPredicate();
+            } else {
+                rc = LogicExpression.createAND(info.getAdditionalPredicate(), rc);
+            }
+        }
+        return rc;
     }
 
 }

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/Openwire2Support.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/Openwire2Support.java?rev=752995&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/Openwire2Support.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/Openwire2Support.java Thu Mar 12 20:18:25 2009
@@ -0,0 +1,82 @@
+package org.apache.activemq.broker.openwire;
+
+import javax.jms.MessageNotWriteableException;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.SessionInfo;
+
+public class Openwire2Support {
+    
+    static private long idGenerator;
+    static private long msgIdGenerator;
+
+    public static ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, ActiveMQDestination destination) throws Exception {
+        ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator);
+        info.setBrowser(false);
+        info.setDestination(destination);
+        info.setPrefetchSize(1000);
+        info.setDispatchAsync(false);
+        return info;
+    }
+
+    public static RemoveInfo closeConsumerInfo(ConsumerInfo consumerInfo) {
+        return consumerInfo.createRemoveCommand();
+    }
+
+    public static ProducerInfo createProducerInfo(SessionInfo sessionInfo) throws Exception {
+        ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator);
+        return info;
+    }
+
+    public static SessionInfo createSessionInfo(ConnectionInfo connectionInfo) throws Exception {
+        SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator);
+        return info;
+    }
+
+    public static ConnectionInfo createConnectionInfo() throws Exception {
+        ConnectionInfo info = new ConnectionInfo();
+        info.setConnectionId(new ConnectionId("connection:" + (++idGenerator)));
+        info.setClientId(info.getConnectionId().getValue());
+        return info;
+    }
+
+    public static ActiveMQTextMessage createMessage(ProducerInfo producerInfo, ActiveMQDestination destination) {
+        return createMessage(producerInfo, destination, 4, null);
+    }
+    
+    public static ActiveMQTextMessage createMessage(ProducerInfo producerInfo, ActiveMQDestination destination, int priority, String payload) {
+        ActiveMQTextMessage message = new ActiveMQTextMessage();
+        message.setJMSPriority(priority);
+        message.setProducerId(producerInfo.getProducerId());
+        message.setMessageId(new MessageId(producerInfo, ++msgIdGenerator));
+        message.setDestination(destination);
+        message.setPersistent(false);
+        if( payload!=null ) {
+            try {
+                message.setText(payload);
+            } catch (MessageNotWriteableException e) {
+            }
+        }
+        return message;
+    }
+
+    public static MessageAck createAck(ConsumerInfo consumerInfo, Message msg, int count, byte ackType) {
+        MessageAck ack = new MessageAck();
+        ack.setAckType(ackType);
+        ack.setConsumerId(consumerInfo.getConsumerId());
+        ack.setDestination(msg.getDestination());
+        ack.setLastMessageId(msg.getMessageId());
+        ack.setMessageCount(count);
+        return ack;
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java?rev=752995&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java Thu Mar 12 20:18:25 2009
@@ -0,0 +1,485 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.openwire;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.Queue;
+import org.apache.activemq.broker.Router;
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.dispatch.PriorityDispatcher;
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.Period;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.Mapper;
+
+public class OpenwireBrokerTest extends TestCase {
+
+    protected static final int PERFORMANCE_SAMPLES = 3000000;
+
+    protected static final int IO_WORK_AMOUNT = 0;
+    protected static final int FANIN_COUNT = 10;
+    protected static final int FANOUT_COUNT = 10;
+
+    protected static final int PRIORITY_LEVELS = 10;
+    protected static final boolean USE_INPUT_QUEUES = true;
+
+    // Set to put senders and consumers on separate brokers.
+    protected boolean multibroker = false;
+
+    // Set to mockup up ptp:
+    protected boolean ptp = false;
+
+    // Set to use tcp IO
+    protected boolean tcp = true;
+    // set to force marshalling even in the NON tcp case.
+    protected boolean forceMarshalling = false;
+
+    protected String sendBrokerURI;
+    protected String receiveBrokerURI;
+
+    // Set's the number of threads to use:
+    protected final int asyncThreadPoolSize = Runtime.getRuntime().availableProcessors();
+    protected boolean usePartitionedQueue = false;
+
+    protected int producerCount;
+    protected int consumerCount;
+    protected int destCount;
+
+    protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items");
+    protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items");
+
+    protected Broker sendBroker;
+    protected Broker rcvBroker;
+    protected ArrayList<Broker> brokers = new ArrayList<Broker>();
+    protected IDispatcher dispatcher;
+    protected final AtomicLong msgIdGenerator = new AtomicLong();
+
+    final ArrayList<RemoteProducer> producers = new ArrayList<RemoteProducer>();
+    final ArrayList<RemoteConsumer> consumers = new ArrayList<RemoteConsumer>();
+
+    static public final Mapper<AsciiBuffer, MessageDelivery> KEY_MAPPER = new Mapper<AsciiBuffer, MessageDelivery>() {
+        public AsciiBuffer map(MessageDelivery element) {
+            return element.getMsgId();
+        }
+    };
+    static public final Mapper<Integer, MessageDelivery> PARTITION_MAPPER = new Mapper<Integer, MessageDelivery>() {
+        public Integer map(MessageDelivery element) {
+            // we modulo 10 to have at most 10 partitions which the producers
+            // gets split across.
+            return (int) (element.getProducerId().hashCode() % 10);
+        }
+    };
+
+    @Override
+    protected void setUp() throws Exception {
+        dispatcher = createDispatcher();
+        dispatcher.start();
+        if (tcp) {
+            sendBrokerURI = "tcp://localhost:10000";
+            receiveBrokerURI = "tcp://localhost:20000";
+        } else {
+            if (forceMarshalling) {
+                sendBrokerURI = "pipe://SendBroker";
+                receiveBrokerURI = "pipe://ReceiveBroker";
+            } else {
+                sendBrokerURI = "pipe://SendBroker";
+                receiveBrokerURI = "pipe://ReceiveBroker";
+            }
+        }
+    }
+
+    protected IDispatcher createDispatcher() {
+        return PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", Broker.MAX_PRIORITY, asyncThreadPoolSize);
+    }
+    
+    public void test_10_10_10() throws Exception {
+        producerCount = 2;
+        destCount = 2;
+        consumerCount = 2;
+
+        createConnections();
+
+        // Start 'em up.
+        startServices();
+        try {
+            reportRates();
+        } finally {
+            stopServices();
+        }
+    }
+
+    public void test_1_1_0() throws Exception {
+        producerCount = 1;
+        destCount = 1;
+
+        createConnections();
+
+        // Start 'em up.
+        startServices();
+        try {
+            reportRates();
+        } finally {
+            stopServices();
+        }
+    }
+
+    public void test_1_1_1() throws Exception {
+        producerCount = 1;
+        destCount = 1;
+        consumerCount = 1;
+
+        createConnections();
+
+        // Start 'em up.
+        startServices();
+        try {
+            reportRates();
+        } finally {
+            stopServices();
+        }
+    }
+
+    public void test_10_1_10() throws Exception {
+        producerCount = FANIN_COUNT;
+        consumerCount = FANOUT_COUNT;
+        destCount = 1;
+
+        createConnections();
+
+        // Start 'em up.
+        startServices();
+        try {
+            reportRates();
+        } finally {
+            stopServices();
+        }
+    }
+
+    public void test_10_1_1() throws Exception {
+        producerCount = FANIN_COUNT;
+        destCount = 1;
+        consumerCount = 1;
+
+        createConnections();
+
+        // Start 'em up.
+        startServices();
+        try {
+            reportRates();
+        } finally {
+            stopServices();
+        }
+    }
+
+    public void test_1_1_10() throws Exception {
+        producerCount = 1;
+        destCount = 1;
+        consumerCount = FANOUT_COUNT;
+
+        createConnections();
+
+        // Start 'em up.
+        startServices();
+        try {
+            reportRates();
+        } finally {
+            stopServices();
+        }
+    }
+
+    public void test_2_2_2() throws Exception {
+        producerCount = 2;
+        destCount = 2;
+        consumerCount = 2;
+
+        createConnections();
+
+        // Start 'em up.
+        startServices();
+        try {
+            reportRates();
+        } finally {
+            stopServices();
+        }
+    }
+
+    /**
+     * Tests 2 producers sending to 1 destination with 2 consumres, but with
+     * consumers set to select only messages from each producer. 1 consumers is
+     * set to slow, the other producer should be able to send quickly.
+     * 
+     * @throws Exception
+     */
+    public void test_2_2_2_SlowConsumer() throws Exception {
+        producerCount = 2;
+        destCount = 2;
+        consumerCount = 2;
+
+        createConnections();
+        consumers.get(0).setThinkTime(50);
+
+        // Start 'em up.
+        startServices();
+        try {
+            reportRates();
+        } finally {
+            stopServices();
+        }
+    }
+
+    public void test_2_2_2_Selector() throws Exception {
+        producerCount = 2;
+        destCount = 2;
+        consumerCount = 2;
+
+        createConnections();
+
+        // Add properties to match producers to their consumers
+        for (int i = 0; i < consumerCount; i++) {
+            String property = "match" + i;
+            consumers.get(i).setSelector(property);
+            producers.get(i).setProperty(property);
+        }
+
+        // Start 'em up.
+        startServices();
+        try {
+            reportRates();
+        } finally {
+            stopServices();
+        }
+    }
+
+    /**
+     * Test sending with 1 high priority sender. The high priority sender should
+     * have higher throughput than the other low priority senders.
+     * 
+     * @throws Exception
+     */
+    public void test_2_1_1_HighPriorityProducer() throws Exception {
+
+        producerCount = 2;
+        destCount = 1;
+        consumerCount = 1;
+
+        createConnections();
+        RemoteProducer producer = producers.get(0);
+        producer.setPriority(1);
+        producer.getRate().setName("High Priority Producer Rate");
+
+        consumers.get(0).setThinkTime(1);
+
+        // Start 'em up.
+        startServices();
+        try {
+
+            System.out.println("Checking rates for test: " + getName());
+            for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
+                Period p = new Period();
+                Thread.sleep(1000 * 5);
+                System.out.println(producer.getRate().getRateSummary(p));
+                System.out.println(totalProducerRate.getRateSummary(p));
+                System.out.println(totalConsumerRate.getRateSummary(p));
+                totalProducerRate.reset();
+                totalConsumerRate.reset();
+            }
+
+        } finally {
+            stopServices();
+        }
+    }
+
+    /**
+     * Test sending with 1 high priority sender. The high priority sender should
+     * have higher throughput than the other low priority senders.
+     * 
+     * @throws Exception
+     */
+    public void test_2_1_1_MixedHighPriorityProducer() throws Exception {
+        producerCount = 2;
+        destCount = 1;
+        consumerCount = 1;
+
+        createConnections();
+        RemoteProducer producer = producers.get(0);
+        producer.setPriority(1);
+        producer.setPriorityMod(3);
+        producer.getRate().setName("High Priority Producer Rate");
+
+        consumers.get(0).setThinkTime(1);
+
+        // Start 'em up.
+        startServices();
+        try {
+
+            System.out.println("Checking rates for test: " + getName());
+            for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
+                Period p = new Period();
+                Thread.sleep(1000 * 5);
+                System.out.println(producer.getRate().getRateSummary(p));
+                System.out.println(totalProducerRate.getRateSummary(p));
+                System.out.println(totalConsumerRate.getRateSummary(p));
+                totalProducerRate.reset();
+                totalConsumerRate.reset();
+            }
+
+        } finally {
+            stopServices();
+        }
+    }
+
+    private void reportRates() throws InterruptedException {
+        System.out.println("Checking rates for test: " + getName() + ", " + (ptp ? "ptp" : "topic"));
+        for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
+            Period p = new Period();
+            Thread.sleep(1000 * 5);
+            System.out.println(totalProducerRate.getRateSummary(p));
+            System.out.println(totalConsumerRate.getRateSummary(p));
+            totalProducerRate.reset();
+            totalConsumerRate.reset();
+        }
+    }
+
+    private void createConnections() throws IOException, URISyntaxException {
+
+        if (multibroker) {
+            sendBroker = createBroker("SendBroker", sendBrokerURI);
+            rcvBroker = createBroker("RcvBroker", receiveBrokerURI);
+            brokers.add(sendBroker);
+            brokers.add(rcvBroker);
+        } else {
+            sendBroker = rcvBroker = createBroker("Broker", sendBrokerURI);
+            brokers.add(sendBroker);
+        }
+
+        Destination[] dests = new Destination[destCount];
+
+        for (int i = 0; i < destCount; i++) {
+            Destination.SingleDestination bean = new Destination.SingleDestination();
+            bean.setName(new AsciiBuffer("dest" + (i + 1)));
+            bean.setDomain(ptp ? Router.QUEUE_DOMAIN : Router.TOPIC_DOMAIN);
+            dests[i] = bean;
+            if (ptp) {
+                Queue queue = createQueue(sendBroker, dests[i]);
+                sendBroker.addQueue(queue);
+                if (multibroker) {
+                    queue = createQueue(rcvBroker, dests[i]);
+                    rcvBroker.addQueue(queue);
+                }
+            }
+        }
+
+        for (int i = 0; i < producerCount; i++) {
+            Destination destination = dests[i % destCount];
+            RemoteProducer producer = createProducer(i, destination);
+            producers.add(producer);
+        }
+
+        for (int i = 0; i < consumerCount; i++) {
+            Destination destination = dests[i % destCount];
+            RemoteConsumer consumer = createConsumer(i, destination);
+            consumers.add(consumer);
+        }
+
+        // Create MultiBroker connections:
+        // if (multibroker) {
+        // Pipe<Message> pipe = new Pipe<Message>();
+        // sendBroker.createBrokerConnection(rcvBroker, pipe);
+        // rcvBroker.createBrokerConnection(sendBroker, pipe.connect());
+        // }
+    }
+
+    private RemoteConsumer createConsumer(int i, Destination destination) throws URISyntaxException {
+        RemoteConsumer consumer = new RemoteConsumer();
+        consumer.setUri(new URI(rcvBroker.getUri()));
+        consumer.setDestination(destination);
+        consumer.setName("consumer" + (i + 1));
+        consumer.setTotalConsumerRate(totalConsumerRate);
+        consumer.setDispatcher(dispatcher);
+        return consumer;
+    }
+
+    private RemoteProducer createProducer(int id, Destination destination) throws URISyntaxException {
+        RemoteProducer producer = new RemoteProducer();
+        producer.setUri(new URI(sendBroker.getUri()));
+        producer.setProducerId(id + 1);
+        producer.setName("producer" + (id + 1));
+        producer.setDestination(destination);
+        producer.setMessageIdGenerator(msgIdGenerator);
+        producer.setTotalProducerRate(totalProducerRate);
+        producer.setDispatcher(dispatcher);
+        return producer;
+    }
+
+    private Queue createQueue(Broker broker, Destination destination) {
+        Queue queue = new Queue();
+        queue.setBroker(broker);
+        queue.setDestination(destination);
+        queue.setKeyExtractor(KEY_MAPPER);
+        if (usePartitionedQueue) {
+            queue.setPartitionMapper(PARTITION_MAPPER);
+        }
+        return queue;
+    }
+
+    private Broker createBroker(String name, String uri) {
+        Broker broker = new Broker();
+        broker.setName(name);
+        broker.setUri(uri);
+        broker.setDispatcher(dispatcher);
+        return broker;
+    }
+
+    private void stopServices() throws Exception {
+        for (RemoteProducer connection : producers) {
+            connection.stop();
+        }
+        for (RemoteConsumer connection : consumers) {
+            connection.stop();
+        }
+        for (Broker broker : brokers) {
+            broker.stop();
+        }
+        if (dispatcher != null) {
+            dispatcher.shutdown();
+        }
+    }
+
+    private void startServices() throws Exception {
+        for (Broker broker : brokers) {
+            broker.start();
+        }
+        for (RemoteConsumer connection : consumers) {
+            connection.start();
+        }
+
+        for (RemoteProducer connection : producers) {
+            connection.start();
+        }
+    }
+
+}

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java?rev=752995&r1=752994&r2=752995&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java Thu Mar 12 20:18:25 2009
@@ -1,11 +1,24 @@
 package org.apache.activemq.broker.openwire;
 
+import static org.apache.activemq.broker.openwire.Openwire2Support.createConnectionInfo;
+import static org.apache.activemq.broker.openwire.Openwire2Support.createConsumerInfo;
+import static org.apache.activemq.broker.openwire.Openwire2Support.createSessionInfo;
+
 import java.net.URI;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.Connection;
 import org.apache.activemq.broker.Destination;
 import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.Router;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.IFlowSink;
@@ -32,27 +45,41 @@
 
     protected final Object inboundMutex = new Object();
     private FlowController<MessageDelivery> inboundController;
+
+    private ActiveMQDestination activemqDestination;
+
+    private ConnectionInfo connectionInfo;
+
+    private SessionInfo sessionInfo;
+
+    private ConsumerInfo consumerInfo;
     
     public void start() throws Exception {
         consumerRate.name("Consumer " + name + " Rate");
         totalConsumerRate.add(consumerRate);
 
-        transport = TransportFactory.compositeConnect(uri);
+        initialize();
+        transport = TransportFactory.connect(uri);
         if(transport instanceof DispatchableTransport)
         {
-            DispatchableTransport dt = ((DispatchableTransport)transport);
-            dt.setName(name + "-client-transport");
-            dt.setDispatcher(getDispatcher());
             schedualWait = true;
         }
-        transport.setTransportListener(this);
-        transport.start();
+        super.start();
+
+        if( destination.getDomain().equals( Router.QUEUE_DOMAIN ) ) {
+            activemqDestination = new ActiveMQQueue(destination.getName().toString());
+        } else {
+            activemqDestination = new ActiveMQTopic(destination.getName().toString());
+        }
+        
+        connectionInfo = createConnectionInfo();
+        transport.oneway(connectionInfo);
+        sessionInfo = createSessionInfo(connectionInfo);
+        transport.oneway(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, activemqDestination);
+        consumerInfo.setPrefetchSize(1000);
+        transport.oneway(consumerInfo);
         
-        // Let the remote side know our name.
-        transport.oneway(name);
-        // Sending the destination acts as the subscribe.
-        transport.oneway(destination);
-        super.initialize();
     }
     
     protected void initialize() {
@@ -78,7 +105,10 @@
     
     public void onCommand(Object command) {
         try {
-            if (command.getClass() == MessageDelivery.class) {
+            if (command.getClass() == WireFormatInfo.class) {
+            } else if (command.getClass() == BrokerInfo.class) {
+                System.out.println("Consumer "+name+" connected to "+((BrokerInfo)command).getBrokerName());
+            } else if (command.getClass() == MessageDelivery.class) {
                 MessageDelivery msg = (MessageDelivery) command;
                 inboundController.add(msg, null);
             } else {

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java?rev=752995&r1=752994&r2=752995&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java Thu Mar 12 20:18:25 2009
@@ -1,12 +1,11 @@
 package org.apache.activemq.broker.openwire;
 
-import static org.apache.activemq.broker.openwire.OpenWireSupport.createConnectionInfo;
-import static org.apache.activemq.broker.openwire.OpenWireSupport.createMessage;
-import static org.apache.activemq.broker.openwire.OpenWireSupport.createProducerInfo;
-import static org.apache.activemq.broker.openwire.OpenWireSupport.createSessionInfo;
+import static org.apache.activemq.broker.openwire.Openwire2Support.createConnectionInfo;
+import static org.apache.activemq.broker.openwire.Openwire2Support.createMessage;
+import static org.apache.activemq.broker.openwire.Openwire2Support.createProducerInfo;
+import static org.apache.activemq.broker.openwire.Openwire2Support.createSessionInfo;
 
 import java.net.URI;
-import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.JMSException;
@@ -15,15 +14,17 @@
 import org.apache.activemq.broker.Destination;
 import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.broker.Router;
-import org.apache.activemq.broker.openwire.OpenWireMessageDelivery;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.Message;
 import org.apache.activemq.command.ProducerAck;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
 import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
 import org.apache.activemq.flow.Flow;
@@ -35,7 +36,6 @@
 import org.apache.activemq.metric.MetricAggregator;
 import org.apache.activemq.metric.MetricCounter;
 import org.apache.activemq.queue.SingleFlowRelay;
-import org.apache.activemq.transport.DispatchableTransport;
 import org.apache.activemq.transport.TransportFactory;
 
 public class RemoteProducer extends Connection implements Dispatchable, FlowUnblockListener<MessageDelivery> {
@@ -61,8 +61,9 @@
     private ActiveMQDestination activemqDestination;
 
     private WindowLimiter<MessageDelivery> outboundLimiter;
-
     private IFlowController<MessageDelivery> outboundController;
+
+    private SingleFlowRelay<MessageDelivery> outboundQueue;
     
     public void start() throws Exception {
         
@@ -77,18 +78,9 @@
         rate.name("Producer " + name + " Rate");
         totalProducerRate.add(rate);
 
-        transport = TransportFactory.compositeConnect(uri);
-        transport.setTransportListener(this);
-        if(transport instanceof DispatchableTransport)
-        {
-            DispatchableTransport dt = ((DispatchableTransport)transport);
-            dt.setName(name + "-client-transport");
-            dt.setDispatcher(getDispatcher());
-        }
-        super.setTransport(transport);
-       
-        super.initialize();
-        transport.start();
+        initialize();
+        transport = TransportFactory.connect(uri);
+        super.start();
         
         if( destination.getDomain().equals( Router.QUEUE_DOMAIN ) ) {
             activemqDestination = new ActiveMQQueue(destination.getName().toString());
@@ -111,26 +103,15 @@
     protected void initialize() {
         Flow ouboundFlow = new Flow(name, false);
         outboundLimiter = new WindowLimiter<MessageDelivery>(true, ouboundFlow, outputWindowSize, outputResumeThreshold);
-        outputQueue = new SingleFlowRelay<MessageDelivery>(ouboundFlow, name + "-outbound", outboundLimiter);
-        outboundController = outputQueue.getFlowController(ouboundFlow);
-
-        if (transport instanceof DispatchableTransport) {
-            outputQueue.setDrain(new IFlowDrain<MessageDelivery>() {
-
-                public void drain(MessageDelivery message, ISourceController<MessageDelivery> controller) {
-                    write(message);
-                }
-            });
-
-        } else {
-            blockingTransport = true;
-            blockingWriter = Executors.newSingleThreadExecutor();
-            outputQueue.setDrain(new IFlowDrain<MessageDelivery>() {
-                public void drain(final MessageDelivery message, ISourceController<MessageDelivery> controller) {
-                    write(message);
-                };
-            });
-        }
+        outboundQueue = new SingleFlowRelay<MessageDelivery>(ouboundFlow, name + "-outbound", outboundLimiter);
+        
+        outboundController = outboundQueue.getFlowController(ouboundFlow);
+        outboundQueue.setDrain(new IFlowDrain<MessageDelivery>() {
+            public void drain(MessageDelivery message, ISourceController<MessageDelivery> controller) {
+                Message msg = message.asType(Message.class);
+                write(msg);
+            }
+        });
     }
     
     public void stop() throws Exception
@@ -142,9 +123,12 @@
     
     public void onCommand(Object command) {
         try {
-            if (command.getClass() == ProducerAck.class) {
+            if (command.getClass() == WireFormatInfo.class) {
+            } else if (command.getClass() == BrokerInfo.class) {
+                System.out.println("Producer "+name+" connected to "+((BrokerInfo)command).getBrokerName());
+            } else if (command.getClass() == ProducerAck.class) {
                 ProducerAck fc = (ProducerAck) command;
-                synchronized (outputQueue) {
+                synchronized (outboundQueue) {
                     outboundLimiter.onProtocolCredit(fc.getSize());
                 }
             } else {
@@ -190,7 +174,7 @@
 				}
 			}
 			
-	        getSink().add(next, null);
+			outboundQueue.add(next, null);
 	        rate.increment();
 	        next = null;
 		}

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java?rev=752995&r1=752994&r2=752995&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java Thu Mar 12 20:18:25 2009
@@ -11,7 +11,6 @@
 import org.apache.activemq.flow.Commands.FlowControl.FlowControlBean;
 import org.apache.activemq.flow.Commands.FlowControl.FlowControlBuffer;
 import org.apache.activemq.flow.Commands.Message.MessageBean;
-import org.apache.activemq.flow.Commands.Message.MessageBuffer;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.DataByteArrayInputStream;
 import org.apache.activemq.util.DataByteArrayOutputStream;