You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/12 19:59:13 UTC

svn commit: r752973 [1/2] - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/openwire/ test/java/org/apache/activemq/broker/ test/java/org/apache/activemq/...

Author: chirino
Date: Thu Mar 12 18:59:12 2009
New Revision: 752973

URL: http://svn.apache.org/viewvc?rev=752973&view=rev
Log:
Refactoring the MockBroker so that it's is more agnostic of the wire protocol used.


Added:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/MockBrokerTest.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenWireSupport.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,219 @@
+package org.apache.activemq;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.broker.DeliveryTarget;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.IFlowLimiter;
+import org.apache.activemq.flow.IFlowRelay;
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.Message;
+import org.apache.activemq.flow.SizeLimiter;
+import org.apache.activemq.queue.SingleFlowRelay;
+import org.apache.activemq.transport.DispatchableTransport;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportListener;
+
+abstract public class Connection implements TransportListener, DeliveryTarget {
+
+    protected Transport transport;
+
+    protected String name;
+
+    private int priorityLevels;
+
+    protected final int outputWindowSize = 1000;
+    protected final int outputResumeThreshold = 900;
+
+    protected final int inputWindowSize = 1000;
+    protected final int inputResumeThreshold = 500;
+    
+    protected IFlowRelay<MessageDelivery> outputQueue;
+
+    private IDispatcher dispatcher;
+    private final AtomicBoolean stopping = new AtomicBoolean();
+    protected Flow outputFlow;
+    protected boolean blockingTransport = false;
+    protected ExecutorService blockingWriter;
+
+    public void setTransport(Transport transport) {
+        this.transport = transport;
+    }
+
+    public void start() throws Exception {
+        transport.setTransportListener(this);
+        transport.start();
+    }
+
+    public void stop() throws Exception {
+        stopping.set(true);
+        if (transport != null) {
+            transport.stop();
+        }
+        if (blockingWriter != null) {
+            blockingWriter.shutdown();
+        }
+    }
+
+    protected void initialize() {
+    }
+    
+    protected final void write(final Object o) {
+        synchronized (outputQueue) {
+            if (!blockingTransport) {
+                try {
+                    transport.oneway(o);
+                } catch (IOException e) {
+                    onException(e);
+                }
+            } else {
+                try {
+                    blockingWriter.execute(new Runnable() {
+                        public void run() {
+                            if (!stopping.get()) {
+                                try {
+                                    transport.oneway(o);
+                                } catch (IOException e) {
+                                    onException(e);
+                                }
+                            }
+                        }
+                    });
+                } catch (RejectedExecutionException re) {
+                    //Must be shutting down.
+                }
+            }
+        }
+    }
+    
+    public void onException(IOException error) {
+        onException((Exception) error);
+    }
+
+    public void onException(Exception error) {
+        if (!isStopping()) {
+            System.out.println("RemoteConnection error: " + error);
+            error.printStackTrace();
+        }
+    }
+
+    public boolean isStopping(){ 
+        return stopping.get();
+    }
+
+    public void transportInterupted() {
+    }
+
+    public void transportResumed() {
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public int getPriorityLevels() {
+        return priorityLevels;
+    }
+
+    public void setPriorityLevels(int priorityLevels) {
+        this.priorityLevels = priorityLevels;
+    }
+
+    public IDispatcher getDispatcher() {
+        return dispatcher;
+    }
+
+    public void setDispatcher(IDispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+        if (transport instanceof DispatchableTransport) {
+            DispatchableTransport dt = ((DispatchableTransport) transport);
+            if (name != null) {
+                dt.setName(name);
+            }
+            dt.setDispatcher(getDispatcher());
+        }
+    }
+
+    public int getOutputWindowSize() {
+        return outputWindowSize;
+    }
+
+    public int getOutputResumeThreshold() {
+        return outputResumeThreshold;
+    }
+
+    public int getInputWindowSize() {
+        return inputWindowSize;
+    }
+
+    public int getInputResumeThreshold() {
+        return inputResumeThreshold;
+    }
+
+    public IFlowRelay<MessageDelivery> getSink() {
+        return outputQueue;
+    }
+
+    public boolean match(MessageDelivery message) {
+        return true;
+    }
+
+    protected interface ProtocolLimiter<E> extends IFlowLimiter<E> {
+        public void onProtocolCredit(int credit);
+    }
+
+    protected class WindowLimiter<E> extends SizeLimiter<E> implements ProtocolLimiter<E> {
+        final Flow flow;
+        final boolean clientMode;
+        private int available;
+
+        public WindowLimiter(boolean clientMode, Flow flow, int capacity, int resumeThreshold) {
+            super(capacity, resumeThreshold);
+            this.clientMode = clientMode;
+            this.flow = flow;
+        }
+
+        public void reserve(E elem) {
+            super.reserve(elem);
+//            if (!clientMode) {
+//                 System.out.println(name + " Reserved " + this);
+//            }
+        }
+
+        public void releaseReserved(E elem) {
+            super.reserve(elem);
+//            if (!clientMode) {
+//                System.out.println(name + " Released Reserved " + this);
+//            }
+        }
+
+        protected void remove(int size) {
+            super.remove(size);
+            if (!clientMode) {
+                available += size;
+                if (available >= capacity - resumeThreshold) {
+                    sendCredit(available);
+                    available = 0;
+                }
+            }
+        }
+
+        protected void sendCredit(int credit) {
+            throw new UnsupportedOperationException("Please override this method to provide and implemenation.");
+        }
+
+        public void onProtocolCredit(int credit) {
+            remove(credit);
+        }
+
+        public int getElementSize(MessageDelivery m) {
+            return m.getFlowLimiterSize();
+        }
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,124 @@
+package org.apache.activemq.broker;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.Connection;
+import org.apache.activemq.broker.openwire.OpenwireBrokerConnection;
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.transport.DispatchableTransportServer;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportAcceptListener;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportServer;
+
+public class Broker implements TransportAcceptListener {
+
+    public static final int MAX_USER_PRIORITY = 10;
+    public static final int MAX_PRIORITY = MAX_USER_PRIORITY + 1;
+    
+    final Router router = new Router();
+
+    final ArrayList<Connection> clientConnections = new ArrayList<Connection>();
+    final HashMap<Destination, Queue> queues = new HashMap<Destination, Queue>();
+
+    private TransportServer transportServer;
+    private String uri;
+    private String name;
+    private IDispatcher dispatcher;
+    private final AtomicBoolean stopping = new AtomicBoolean();
+
+    public String getName() {
+        return name;
+    }
+
+
+    public void addQueue(Queue queue) {
+        Domain domain = router.getDomain(queue.getDestination().getDomain());
+        domain.add(queue.getDestination().getName(), queue);
+    }
+
+    public final void stop() throws Exception {
+        stopping.set(true);
+        transportServer.stop();
+        
+        for (Connection connection : clientConnections) {
+            connection.stop();
+        }
+        for (Queue queue : queues.values()) {
+            queue.stop();
+        }
+        dispatcher.shutdown();
+
+    }
+
+    public final void start() throws Exception {
+
+        dispatcher.start();
+
+        transportServer = TransportFactory.bind(new URI(uri));
+        transportServer.setAcceptListener(this);
+        if (transportServer instanceof DispatchableTransportServer) {
+            ((DispatchableTransportServer) transportServer).setDispatcher(dispatcher);
+        }
+        transportServer.start();
+
+        for (Queue queue : queues.values()) {
+            queue.start();
+        }
+    }
+
+    public void onAccept(final Transport transport) {
+        OpenwireBrokerConnection connection = new OpenwireBrokerConnection();
+        connection.setBroker(this);
+        connection.setTransport(transport);
+        connection.setPriorityLevels(MAX_PRIORITY);
+        connection.setDispatcher(dispatcher);
+        clientConnections.add(connection);
+        try {
+            connection.start();
+        } catch (Exception e1) {
+            onAcceptError(e1);
+        }
+    }
+
+    public void onAcceptError(Exception error) {
+        System.out.println("Accept error: " + error);
+        error.printStackTrace();
+    }
+
+    public IDispatcher getDispatcher() {
+        return dispatcher;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public void setDispatcher(IDispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+    public String getUri() {
+        return uri;
+    }
+
+    public void setUri(String uri) {
+        this.uri = uri;
+    }
+
+    public URI getConnectURI() {
+        return transportServer.getConnectURI();
+    }
+
+    public boolean isStopping() {
+        return stopping.get();
+    }
+
+    public Router getRouter() {
+        return router;
+    }
+
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,23 @@
+package org.apache.activemq.broker;
+
+import org.apache.activemq.Connection;
+
+abstract public class BrokerConnection extends Connection {
+    
+    protected Broker broker;
+
+    public Broker getBroker() {
+        return broker;
+    }
+
+    public void setBroker(Broker broker) {
+        this.broker = broker;
+    }
+    
+    @Override
+    public boolean isStopping() {
+        return super.isStopping() || broker.isStopping();
+    }
+    
+    
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,14 @@
+/**
+ * 
+ */
+package org.apache.activemq.broker;
+
+import org.apache.activemq.flow.IFlowSink;
+
+public interface DeliveryTarget {
+    
+    public IFlowSink<MessageDelivery> getSink();
+    
+    public boolean match(MessageDelivery message);
+    
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,84 @@
+package org.apache.activemq.broker;
+
+import java.util.Collection;
+
+import org.apache.activemq.protobuf.AsciiBuffer;
+
+public interface Destination {
+
+    AsciiBuffer getDomain();
+    AsciiBuffer getName();
+    Collection<Destination> getDestinations();
+    
+    public class SingleDestination implements Destination {
+
+        private AsciiBuffer domain;
+        private AsciiBuffer name;
+        
+        public SingleDestination() {
+        }
+        public SingleDestination(AsciiBuffer domain, AsciiBuffer name) {
+            setDomain(domain);
+            setName(name);
+        }
+        public SingleDestination(String domain, String name) {
+            setDomain(domain);
+            setName(name);
+        }
+
+        public Collection<Destination> getDestinations() {
+            return null;
+        }
+
+        public AsciiBuffer getDomain() {
+            return domain;
+        }
+
+        public AsciiBuffer getName() {
+            return name;
+        }
+        public void setDomain(AsciiBuffer domain) {
+            this.domain = domain;
+        }
+        public void setName(AsciiBuffer name) {
+            this.name = name;
+        }
+        
+        private void setName(String name) {
+            setName(new AsciiBuffer(name));
+        }
+        private void setDomain(String domain) {
+            setDomain(new AsciiBuffer(domain));
+        }
+    }
+    
+    public class MultiDestination implements Destination {
+
+        private Collection<Destination> destinations;
+
+        public MultiDestination() {
+        }
+
+        public MultiDestination(Collection<Destination> destinations) {
+            this.destinations=destinations;
+        }
+
+        public Collection<Destination> getDestinations() {
+            return destinations;
+        }
+        
+        public void setDestinations(Collection<Destination> destinations) {
+            this.destinations = destinations;
+        }
+
+        public AsciiBuffer getDomain() {
+            return null;
+        }
+
+        public AsciiBuffer getName() {
+            return null;
+        }
+
+    }
+    
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,23 @@
+package org.apache.activemq.broker;
+
+import java.util.Collection;
+
+import org.apache.activemq.protobuf.AsciiBuffer;
+
+/**
+ * Represents a messaging domain like pub/sub or point to point in JMS terms or an Exchange in
+ * AMQP terms.
+ * 
+ * @author chirino
+ */
+public interface Domain {
+
+    public void add(AsciiBuffer name, Object value);
+    
+    public Object remove(AsciiBuffer name);
+
+    public void bind(AsciiBuffer name, DeliveryTarget dt);
+
+    public Collection<DeliveryTarget> route(MessageDelivery msg);
+    
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,22 @@
+package org.apache.activemq.broker;
+
+import org.apache.activemq.protobuf.AsciiBuffer;
+
+public interface MessageDelivery {
+
+    public Destination getDestination();
+
+    public int getPriority();
+
+    public int getFlowLimiterSize();
+
+    public AsciiBuffer getMsgId();
+
+    public AsciiBuffer getProducerId();
+
+    public void setCompletionCallback(Runnable runnable);
+    public Runnable getCompletionCallback();
+
+    public <T> T asType(Class<T> type);
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,159 @@
+/**
+ * 
+ */
+package org.apache.activemq.broker;
+
+import java.util.HashMap;
+
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.PrioritySizeLimiter;
+import org.apache.activemq.flow.SizeLimiter;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.IQueue;
+import org.apache.activemq.queue.Mapper;
+import org.apache.activemq.queue.PartitionedQueue;
+import org.apache.activemq.queue.SharedPriorityQueue;
+import org.apache.activemq.queue.SharedQueue;
+import org.apache.activemq.queue.Subscription;
+
+public class Queue implements DeliveryTarget {
+
+    HashMap<DeliveryTarget, Subscription<MessageDelivery>> subs = new HashMap<DeliveryTarget, Subscription<MessageDelivery>>();
+    private Destination destination;
+    private IQueue<AsciiBuffer, MessageDelivery> queue;
+    private Broker broker;
+    
+    private Mapper<Integer, MessageDelivery> partitionMapper;
+    private Mapper<AsciiBuffer, MessageDelivery> keyExtractor;
+
+    private IQueue<AsciiBuffer, MessageDelivery> createQueue() {
+
+        if (partitionMapper!=null) {
+            PartitionedQueue<Integer, AsciiBuffer, MessageDelivery> queue = new PartitionedQueue<Integer, AsciiBuffer, MessageDelivery>() {
+                @Override
+                protected IQueue<AsciiBuffer, MessageDelivery> cratePartition(Integer partitionKey) {
+                    return createSharedFlowQueue();
+                }
+            };
+            queue.setPartitionMapper(partitionMapper);
+            queue.setResourceName(destination.getName().toString());
+            return queue;
+        } else {
+            return createSharedFlowQueue();
+        }
+    }
+
+
+    public static final Mapper<Integer, MessageDelivery> PRIORITY_MAPPER = new Mapper<Integer, MessageDelivery>() {
+        public Integer map(MessageDelivery element) {
+            return element.getPriority();
+        }
+    };
+    
+    private IQueue<AsciiBuffer, MessageDelivery> createSharedFlowQueue() {
+        if (Broker.MAX_PRIORITY > 1) {
+            PrioritySizeLimiter<MessageDelivery> limiter = new PrioritySizeLimiter<MessageDelivery>(100, 1, Broker.MAX_PRIORITY);
+            limiter.setPriorityMapper(PRIORITY_MAPPER);
+            SharedPriorityQueue<AsciiBuffer, MessageDelivery> queue = new SharedPriorityQueue<AsciiBuffer, MessageDelivery>(destination.getName().toString(), limiter);
+            queue.setKeyMapper(keyExtractor);
+            queue.setAutoRelease(true);
+            queue.setDispatcher(broker.getDispatcher());
+            return queue;
+        } else {
+            SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(100, 1);
+            SharedQueue<AsciiBuffer, MessageDelivery> queue = new SharedQueue<AsciiBuffer, MessageDelivery>(destination.getName().toString(), limiter);
+            queue.setKeyMapper(keyExtractor);
+            queue.setAutoRelease(true);
+            queue.setDispatcher(broker.getDispatcher());
+            return queue;
+        }
+    }
+
+    public final void deliver(ISourceController<MessageDelivery> source, MessageDelivery msg) {
+        queue.add(msg, source);
+    }
+    
+    public final Destination getDestination() {
+        return destination;
+    }
+
+    public final void addConsumer(final DeliveryTarget dt) {
+        Subscription<MessageDelivery> sub = new Subscription<MessageDelivery>() {
+            public boolean isPreAcquired() {
+                return true;
+            }
+
+            public boolean matches(MessageDelivery message) {
+                return dt.match(message);
+            }
+
+            public boolean isRemoveOnDispatch() {
+                return true;
+            }
+
+            public IFlowSink<MessageDelivery> getSink() {
+                return dt.getSink();
+            }
+
+            @Override
+            public String toString() {
+                return getSink().toString();
+            }
+        };
+        subs.put(dt, sub);
+        queue.addSubscription(sub);
+    }
+
+    public boolean removeSubscirption(final DeliveryTarget dt) {
+        Subscription<MessageDelivery> sub = subs.remove(dt);
+        if (sub != null) {
+            return queue.removeSubscription(sub);
+        }
+        return false;
+    }
+
+    public void start() throws Exception {
+        queue = createQueue();
+    }
+
+    public void stop() throws Exception {
+    }
+
+    public IFlowSink<MessageDelivery> getSink() {
+        return queue;
+    }
+
+    public boolean match(MessageDelivery message) {
+        return true;
+    }
+
+    public Broker getBroker() {
+        return broker;
+    }
+
+    public void setBroker(Broker broker) {
+        this.broker = broker;
+    }
+
+    public Mapper<Integer, MessageDelivery> getPartitionMapper() {
+        return partitionMapper;
+    }
+
+    public void setPartitionMapper(Mapper<Integer, MessageDelivery> partitionMapper) {
+        this.partitionMapper = partitionMapper;
+    }
+
+    public Mapper<AsciiBuffer, MessageDelivery> getKeyExtractor() {
+        return keyExtractor;
+    }
+
+    public void setKeyExtractor(Mapper<AsciiBuffer, MessageDelivery> keyExtractor) {
+        this.keyExtractor = keyExtractor;
+    }
+
+    public void setDestination(Destination destination) {
+        this.destination = destination;
+    }
+
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,34 @@
+package org.apache.activemq.broker;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+
+import org.apache.activemq.protobuf.AsciiBuffer;
+
+public class QueueDomain implements Domain {
+    
+    final HashMap<AsciiBuffer, Queue> queues = new HashMap<AsciiBuffer, Queue>();
+
+    public void add(AsciiBuffer name, Object queue) {
+        queues.put(name, (Queue)queue);
+    }
+    public Object remove(AsciiBuffer name) {
+        return queues.remove(name);
+    }
+
+    public void bind(AsciiBuffer name, DeliveryTarget deliveryTarget) {
+        queues.get(name).addConsumer(deliveryTarget);
+    }
+
+    public Collection<DeliveryTarget> route(MessageDelivery name) {
+        Queue queue = queues.get(name);
+        if( queue!=null ) {
+            ArrayList<DeliveryTarget> rc = new ArrayList<DeliveryTarget>(1);
+            rc.add(queue);
+            return rc;
+        }
+        return null;
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,46 @@
+/**
+ * 
+ */
+package org.apache.activemq.broker;
+
+import java.util.Collection;
+import java.util.HashMap;
+
+import org.apache.activemq.protobuf.AsciiBuffer;
+
+final public class Router {
+    
+    public static final AsciiBuffer TOPIC_DOMAIN = new AsciiBuffer("topic");
+    public static final AsciiBuffer QUEUE_DOMAIN = new AsciiBuffer("queue");
+    
+    private final HashMap<AsciiBuffer, Domain> domains = new HashMap<AsciiBuffer, Domain>();
+    
+    public Router() {
+        domains.put(QUEUE_DOMAIN, new QueueDomain());
+        domains.put(TOPIC_DOMAIN, new TopicDomain());
+    }
+    
+    public Domain getDomain(AsciiBuffer name) {
+        return domains.get(name);
+    }
+
+    public Domain putDomain(AsciiBuffer name, Domain domain) {
+        return domains.put(name, domain);
+    }
+
+    public Domain removeDomain(Object name) {
+        return domains.remove(name);
+    }
+
+    
+    public synchronized void bind(Destination destination, DeliveryTarget dt) {
+        Domain domain = domains.get(destination.getDomain());
+        domain.bind(destination.getName(), dt);
+    }
+
+    public Collection<DeliveryTarget> route(MessageDelivery msg) {
+        Domain domain = domains.get(msg.getDestination().getDomain());
+        return domain.route(msg);
+    }
+
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,32 @@
+package org.apache.activemq.broker;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+
+import org.apache.activemq.protobuf.AsciiBuffer;
+
+public class TopicDomain implements Domain {
+    
+    final HashMap<AsciiBuffer, ArrayList<DeliveryTarget>> topicsTargets = new HashMap<AsciiBuffer, ArrayList<DeliveryTarget>>();
+
+    public void add(AsciiBuffer name, Object queue) {
+    }
+    public Object remove(AsciiBuffer name) {
+        return null;
+    }
+
+    public void bind(AsciiBuffer name, DeliveryTarget target) {
+        ArrayList<DeliveryTarget> targets = topicsTargets.get(name);
+        if (targets == null) {
+            targets = new ArrayList<DeliveryTarget>();
+            topicsTargets.put(name, targets);
+        }
+        targets.add(target);
+    }
+
+    public Collection<DeliveryTarget> route(MessageDelivery name) {
+        return topicsTargets.get(name);
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,68 @@
+package org.apache.activemq.broker.openwire;
+
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.protobuf.AsciiBuffer;
+
+public class OpenWireMessageDelivery implements MessageDelivery {
+
+    private final Message message;
+    private Destination destination;
+    private AsciiBuffer producerId;
+    private Runnable completionCallback;
+
+    public OpenWireMessageDelivery(Message message) {
+        this.message = message;
+    }
+
+    public Destination getDestination() {
+        if( destination == null ) {
+            destination = OpenwireBrokerConnection.convert(message.getDestination());
+        }
+        return destination;
+    }
+
+    public int getFlowLimiterSize() {
+        return message.getSize();
+    }
+
+    public int getPriority() {
+        return message.getPriority();
+    }
+
+    public AsciiBuffer getMsgId() {
+        return null;
+    }
+
+    public AsciiBuffer getProducerId() {
+        if( producerId == null ) {
+            producerId = new AsciiBuffer(message.getProducerId().toString());
+        }
+        return producerId;
+    }
+
+    public Message getMessage() {
+        return message;
+    }
+
+    public Runnable getCompletionCallback() {
+        return completionCallback;
+    }
+
+    public void setCompletionCallback(Runnable completionCallback) {
+        this.completionCallback = completionCallback;
+    }
+
+    public <T> T asType(Class<T> type) {
+        if( type == Message.class ) {
+            return type.cast(message);
+        }
+        // TODO: is this right?
+        if( message.getClass().isAssignableFrom(type) ) {
+            return type.cast(message);
+        }
+        return null;
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,472 @@
+package org.apache.activemq.broker.openwire;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+
+import org.apache.activemq.broker.BrokerConnection;
+import org.apache.activemq.broker.DeliveryTarget;
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.Router;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionControl;
+import org.apache.activemq.command.ConnectionError;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerControl;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.ControlCommand;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.FlushCommand;
+import org.apache.activemq.command.KeepAliveInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.ProducerAck;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.filter.LogicExpression;
+import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.filter.NoLocalExpression;
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.FlowController;
+import org.apache.activemq.flow.IFlowController;
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.IFlowSource;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.SizeLimiter;
+import org.apache.activemq.flow.ISinkController.FlowControllable;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.selector.SelectorParser;
+import org.apache.activemq.state.CommandVisitor;
+
+public class OpenwireBrokerConnection extends BrokerConnection {
+    
+    protected final HashMap<ProducerId, ProducerContext> producers = new HashMap<ProducerId, ProducerContext>();
+    protected final HashMap<ConsumerId, ConsumerContext> consumers = new HashMap<ConsumerId, ConsumerContext>();
+    
+    protected final Object inboundMutex = new Object();
+    protected IFlowController<MessageDelivery> inboundController;
+
+    protected IFlowController<MessageDelivery> outboundController;
+//    public ProtocolLimiter<MessageDelivery> outboundLimiter;
+    protected Flow ouboundFlow;
+
+    public void onCommand(Object o) {
+        final Command command = (Command) o;
+        boolean responseRequired = command.isResponseRequired();
+        try {
+            command.visit(new CommandVisitor() {
+
+                ///////////////////////////////////////////////////////////////////
+                // Methods that keep track of the client state
+                ///////////////////////////////////////////////////////////////////
+                public Response processAddConnection(ConnectionInfo info) throws Exception {
+                    return ack(command);
+                }
+                public Response processAddSession(SessionInfo info) throws Exception {
+                    return ack(command);
+                }
+                public Response processAddProducer(ProducerInfo info) throws Exception {
+                    producers.put(info.getProducerId(), new ProducerContext(info));
+                    return ack(command);
+                }
+                public Response processAddConsumer(ConsumerInfo info) throws Exception {
+                    ConsumerContext ctx = new ConsumerContext(info);
+                    consumers.put(info.getConsumerId(), ctx);
+                    
+                    broker.getRouter().bind(convert(info.getDestination()), ctx);
+                    
+                    
+                    return ack(command);
+                }
+                public Response processRemoveConnection(ConnectionId info, long arg1) throws Exception {
+                    return ack(command);
+                }
+                public Response processRemoveSession(SessionId info, long arg1) throws Exception {
+                    return ack(command);
+                }
+                public Response processRemoveProducer(ProducerId info) throws Exception {
+                    producers.remove(info);
+                    return ack(command);
+                }
+                public Response processRemoveConsumer(ConsumerId info, long arg1) throws Exception {
+                    return ack(command);
+                }
+
+                ///////////////////////////////////////////////////////////////////
+                // Message Processing Methods.
+                ///////////////////////////////////////////////////////////////////
+                public Response processMessage(Message info) throws Exception {
+                    ProducerId producerId = info.getProducerId();
+                    ProducerContext producerContext = producers.get(producerId);
+                    
+                    OpenWireMessageDelivery md = new OpenWireMessageDelivery(info);
+                    
+                    // Only producers that are not using a window will block, and if it blocks.
+                    // yes we block the connection's read thread.  yes other sessions will not get
+                    // serviced while we block here.  The producer is depending on TCP flow 
+                    // control to slow him down so we have to stop ready from the socket at this 
+                    // point.
+                    while( !producerContext.controller.offer(md, null) ) {
+                        producerContext.controller.waitForFlowUnblock();
+                    }
+                    return null;
+                }
+                public Response processMessageAck(MessageAck info) throws Exception {
+                    return ack(command);
+                }
+                public Response processMessagePull(MessagePull info) throws Exception {
+                    return ack(command);
+                }
+                public Response processProducerAck(ProducerAck info) throws Exception {
+                    return ack(command);
+                }
+
+                ///////////////////////////////////////////////////////////////////
+                // Control Methods
+                ///////////////////////////////////////////////////////////////////
+                public Response processWireFormat(WireFormatInfo info) throws Exception {
+                    return ack(command);
+                }
+                public Response processShutdown(ShutdownInfo info) throws Exception {
+                    return ack(command);
+                }
+                public Response processKeepAlive(KeepAliveInfo info) throws Exception {
+                    return ack(command);
+                }
+                public Response processFlush(FlushCommand info) throws Exception {
+                    return ack(command);
+                }
+                public Response processConnectionControl(ConnectionControl info) throws Exception {
+                    return ack(command);
+                }
+                public Response processConnectionError(ConnectionError info) throws Exception {
+                    return ack(command);
+                }
+                public Response processConsumerControl(ConsumerControl info) throws Exception {
+                    return ack(command);
+                }
+
+                ///////////////////////////////////////////////////////////////////
+                // Methods for server management
+                ///////////////////////////////////////////////////////////////////
+                public Response processAddDestination(DestinationInfo info) throws Exception {
+                    throw new UnsupportedOperationException();
+                }
+                public Response processRemoveDestination(DestinationInfo info) throws Exception {
+                    throw new UnsupportedOperationException();
+                }
+                public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
+                    throw new UnsupportedOperationException();
+                }
+                public Response processControlCommand(ControlCommand info) throws Exception {
+                    throw new UnsupportedOperationException();
+                }
+
+                ///////////////////////////////////////////////////////////////////
+                // Methods for transaction management
+                ///////////////////////////////////////////////////////////////////
+                public Response processBeginTransaction(TransactionInfo info) throws Exception {
+                    throw new UnsupportedOperationException();
+                }
+                public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
+                    throw new UnsupportedOperationException();
+                }
+                public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
+                    throw new UnsupportedOperationException();
+                }
+                public Response processEndTransaction(TransactionInfo info) throws Exception {
+                    throw new UnsupportedOperationException();
+                }
+                public Response processForgetTransaction(TransactionInfo info) throws Exception {
+                    throw new UnsupportedOperationException();
+                }
+                public Response processPrepareTransaction(TransactionInfo info) throws Exception {
+                    throw new UnsupportedOperationException();
+                }
+                public Response processRecoverTransactions(TransactionInfo info) throws Exception {
+                    throw new UnsupportedOperationException();
+                }
+                public Response processRollbackTransaction(TransactionInfo info) throws Exception {
+                    throw new UnsupportedOperationException();
+                }
+
+                ///////////////////////////////////////////////////////////////////
+                // Methods for cluster operations
+                ///////////////////////////////////////////////////////////////////
+                public Response processBrokerInfo(BrokerInfo info) throws Exception {
+                    throw new UnsupportedOperationException();
+                }
+                public Response processMessageDispatch(MessageDispatch info) throws Exception {
+                    throw new UnsupportedOperationException();
+                }
+                public Response processMessageDispatchNotification(MessageDispatchNotification info) throws Exception {
+                    throw new UnsupportedOperationException();
+                }
+            });
+        } catch (Exception e) {
+            if (responseRequired) {
+                ExceptionResponse response = new ExceptionResponse(e);
+                response.setCorrelationId(command.getCommandId());
+                write(response);
+            } else {
+                onException(e);
+            }
+            
+        }
+    }
+
+
+    ///////////////////////////////////////////////////////////////////
+    // Internal Support Methods
+    ///////////////////////////////////////////////////////////////////
+
+    private Response ack(Command command) {
+        Response  rc = null;
+        if( command.isResponseRequired() ) {
+            rc = new Response();
+            rc.setCorrelationId(command.getCommandId());
+        }
+        return rc;
+    }
+    
+    @Override
+    public void start() throws Exception {
+        super.start();
+        BrokerInfo info = new BrokerInfo();
+        info.setBrokerId(new BrokerId(broker.getName()));
+        info.setBrokerName(broker.getName());
+        info.setBrokerURL(broker.getConnectURI().toString());
+        write(info);
+    }
+    
+    static class FlowControllableAdapter implements FlowControllable<MessageDelivery> {
+        public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
+        }
+
+        public IFlowSink<MessageDelivery> getFlowSink() {
+            return null;
+        }
+
+        public IFlowSource<MessageDelivery> getFlowSource() {
+            return null;
+        }
+    }
+    
+    
+    class ConsumerContext implements DeliveryTarget {
+        
+        private final ConsumerInfo info;
+        private String name;
+        private BooleanExpression selector;
+
+        public ConsumerContext(final ConsumerInfo info) throws InvalidSelectorException {
+            this.info = info;
+            this.name = info.getConsumerId().toString();
+            selector = parseSelector(info);
+        }
+
+        public IFlowSink<MessageDelivery> getSink() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        public boolean match(MessageDelivery message) {
+            Message msg = message.asType(Message.class);
+            if( msg ==null ) {
+                return false;
+            }
+            
+            MessageEvaluationContext selectorContext = new MessageEvaluationContext();
+            selectorContext.setMessageReference(msg);
+            selectorContext.setDestination(info.getDestination());
+            try {
+                return (selector == null || selector.matches(selectorContext));
+            } catch (JMSException e) {
+                e.printStackTrace();
+                return false;
+            }
+        }
+        
+    }
+
+    private static BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException {
+        BooleanExpression rc = null;
+        if (info.getSelector() != null) {
+            rc = SelectorParser.parse(info.getSelector());
+        }
+        if (info.isNoLocal()) {
+            if (rc == null) {
+                rc = new NoLocalExpression(info.getConsumerId().getConnectionId());
+            } else {
+                rc = LogicExpression.createAND(new NoLocalExpression(info.getConsumerId().getConnectionId()), rc);
+            }
+        }
+        if (info.getAdditionalPredicate() != null) {
+            if (rc == null) {
+                rc = info.getAdditionalPredicate();
+            } else {
+                rc = LogicExpression.createAND(info.getAdditionalPredicate(), rc);
+            }
+        }
+        return rc;
+    }
+
+    class ProducerContext {
+        
+        private final ProducerInfo info;
+        private IFlowController<MessageDelivery> controller;
+        private String name;
+
+        public ProducerContext(final ProducerInfo info) {
+            this.info = info;
+            this.name = info.getProducerId().toString();
+            
+            // Openwire only uses credit windows at the producer level for producers that request the feature.
+            if( info.getWindowSize() > 0 ) {
+                Flow flow = new Flow(info.getProducerId().toString(), false);
+                
+                WindowLimiter<MessageDelivery> limiter = new WindowLimiter<MessageDelivery>(false, flow, info.getWindowSize(), info.getWindowSize()/2) {
+                    @Override
+                    protected void sendCredit(int credit) {
+                        ProducerAck ack = new ProducerAck(info.getProducerId(), credit);
+                        write(ack);
+                    }
+                };
+                
+                controller = new FlowController<MessageDelivery>(new FlowControllableAdapter() {
+                    public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
+                        route(controller, elem);
+                    }
+                    public String toString() {
+                        return name;
+                    }
+                }, flow, limiter, inboundMutex);
+            } else {
+                controller = inboundController;
+            }
+        }
+    }
+    
+    protected void route(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
+        // TODO:
+        // Consider doing some caching of this target list. Most producers always send to 
+        // the same destination.
+        Collection<DeliveryTarget> targets = broker.getRouter().route(elem);
+        
+        final Message message = ((OpenWireMessageDelivery)elem).getMessage();
+        if( targets != null ) { 
+            
+            if( message.isResponseRequired() ) {
+                // We need to ack the message once we ensure we won't loose it.
+                // We know we won't loose it once it's persisted or delivered to a consumer
+                // Setup a callback to get notifed once one of those happens.
+                if( message.isPersistent() ) {
+                    elem.setCompletionCallback(new Runnable(){
+                        public void run() {
+                            ack(message);
+                        }
+                    });
+                } else {
+                    // Let the client know the broker got the message.
+                    ack(message);
+                }
+            }
+            
+            // Deliver the message to all the targets..
+            for (DeliveryTarget dt : targets) {
+                if (dt.match(elem)) {
+                    dt.getSink().add(elem, controller);
+                }
+            }
+            
+        } else {
+            // Let the client know we got the message even though there 
+            // were no valid targets to deliver the message to.
+            if( message.isResponseRequired() ) {
+                ack(message);
+            }
+        }
+        controller.elementDispatched(elem);
+    }
+    
+    protected void initialize() {
+        
+        // Setup the input processing..
+        Flow flow = new Flow(name, false);
+        SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(inputWindowSize, inputResumeThreshold);
+        inboundController = new FlowController<MessageDelivery>(new FlowControllableAdapter() {
+            public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
+                route(controller, elem);
+            }
+            public String toString() {
+                return name;
+            }
+        }, flow, limiter, inboundMutex);
+
+//        ouboundFlow = new Flow(name, false);
+//        outboundLimiter = new WindowLimiter<MessageDelivery>(true, ouboundFlow, outputWindowSize, outputResumeThreshold);
+//        outputQueue = new SingleFlowRelay<MessageDelivery>(ouboundFlow, name + "-outbound", outboundLimiter);
+//        outboundController = outputQueue.getFlowController(ouboundFlow);
+//
+//        if (transport instanceof DispatchableTransport) {
+//            outputQueue.setDrain(new IFlowDrain<MessageDelivery>() {
+//
+//                public void drain(MessageDelivery message, ISourceController<MessageDelivery> controller) {
+//                    write(message);
+//                }
+//            });
+//
+//        } else {
+//            blockingTransport = true;
+//            blockingWriter = Executors.newSingleThreadExecutor();
+//            outputQueue.setDrain(new IFlowDrain<MessageDelivery>() {
+//                public void drain(final MessageDelivery message, ISourceController<MessageDelivery> controller) {
+//                    write(message);
+//                };
+//            });
+//
+//        }
+    }
+    
+    static public Destination convert(ActiveMQDestination dest) {
+        if( dest.isComposite() ) {
+            ActiveMQDestination[] compositeDestinations = dest.getCompositeDestinations();
+            ArrayList<Destination> d= new ArrayList<Destination>();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                d.add(convert(compositeDestinations[i]));
+            }
+            return new Destination.MultiDestination(d);
+        } 
+        AsciiBuffer domain;
+        if( dest.isQueue() ) {
+            domain = Router.QUEUE_DOMAIN;
+        } if( dest.isTopic() ) {
+            domain = Router.TOPIC_DOMAIN;
+        } else {
+            throw new IllegalArgumentException("Unsupported domain type: "+ dest);
+        }
+        return new Destination.SingleDestination(domain, new AsciiBuffer(dest.getPhysicalName())); 
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/MockBrokerTest.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/MockBrokerTest.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/MockBrokerTest.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,484 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.openwire;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.Queue;
+import org.apache.activemq.broker.Router;
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.dispatch.PriorityDispatcher;
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.Period;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.Mapper;
+
+public class MockBrokerTest extends TestCase {
+
+    protected static final int PERFORMANCE_SAMPLES = 3;
+
+    protected static final int IO_WORK_AMOUNT = 0;
+    protected static final int FANIN_COUNT = 10;
+    protected static final int FANOUT_COUNT = 10;
+
+    protected static final int PRIORITY_LEVELS = 10;
+    protected static final boolean USE_INPUT_QUEUES = true;
+
+    // Set to put senders and consumers on separate brokers.
+    protected boolean multibroker = false;
+
+    // Set to mockup up ptp:
+    protected boolean ptp = false;
+
+    // Set to use tcp IO
+    protected boolean tcp = true;
+    // set to force marshalling even in the NON tcp case.
+    protected boolean forceMarshalling = false;
+
+    protected String sendBrokerURI;
+    protected String receiveBrokerURI;
+
+    // Set's the number of threads to use:
+    protected final int asyncThreadPoolSize = Runtime.getRuntime().availableProcessors();
+    protected boolean usePartitionedQueue = false;
+
+    protected int producerCount;
+    protected int consumerCount;
+    protected int destCount;
+
+    protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items");
+    protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items");
+
+    protected Broker sendBroker;
+    protected Broker rcvBroker;
+    protected ArrayList<Broker> brokers = new ArrayList<Broker>();
+    protected IDispatcher dispatcher;
+    protected final AtomicLong msgIdGenerator = new AtomicLong();
+
+    final ArrayList<RemoteProducer> producers = new ArrayList<RemoteProducer>();
+    final ArrayList<RemoteConsumer> consumers = new ArrayList<RemoteConsumer>();
+
+    static public final Mapper<AsciiBuffer, MessageDelivery> KEY_MAPPER = new Mapper<AsciiBuffer, MessageDelivery>() {
+        public AsciiBuffer map(MessageDelivery element) {
+            return element.getMsgId();
+        }
+    };
+    static public final Mapper<Integer, MessageDelivery> PARTITION_MAPPER = new Mapper<Integer, MessageDelivery>() {
+        public Integer map(MessageDelivery element) {
+            // we modulo 10 to have at most 10 partitions which the producers
+            // gets split across.
+            return (int) (element.getProducerId().hashCode() % 10);
+        }
+    };
+
+    @Override
+    protected void setUp() throws Exception {
+        dispatcher = createDispatcher();
+        dispatcher.start();
+        if (tcp) {
+            sendBrokerURI = "tcp://localhost:10000?wireFormat=proto2";
+            receiveBrokerURI = "tcp://localhost:20000?wireFormat=proto2";
+        } else {
+            if (forceMarshalling) {
+                sendBrokerURI = "pipe://SendBroker?wireFormat=proto";
+                receiveBrokerURI = "pipe://ReceiveBroker?wireFormat=proto";
+            } else {
+                sendBrokerURI = "pipe://SendBroker";
+                receiveBrokerURI = "pipe://ReceiveBroker";
+            }
+        }
+    }
+
+    protected IDispatcher createDispatcher() {
+        return PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", Broker.MAX_PRIORITY, asyncThreadPoolSize);
+    }
+    
+    public void test_10_10_10() throws Exception {
+        producerCount = 2;
+        destCount = 2;
+        consumerCount = 2;
+
+        createConnections();
+
+        // Start 'em up.
+        startServices();
+        try {
+            reportRates();
+        } finally {
+            stopServices();
+        }
+    }
+
+    public void test_1_1_0() throws Exception {
+        producerCount = 1;
+        destCount = 1;
+
+        createConnections();
+
+        // Start 'em up.
+        startServices();
+        try {
+            reportRates();
+        } finally {
+            stopServices();
+        }
+    }
+
+    public void test_1_1_1() throws Exception {
+        producerCount = 1;
+        destCount = 1;
+        consumerCount = 1;
+
+        createConnections();
+
+        // Start 'em up.
+        startServices();
+        try {
+            reportRates();
+        } finally {
+            stopServices();
+        }
+    }
+
+    public void test_10_1_10() throws Exception {
+        producerCount = FANIN_COUNT;
+        consumerCount = FANOUT_COUNT;
+        destCount = 1;
+
+        createConnections();
+
+        // Start 'em up.
+        startServices();
+        try {
+            reportRates();
+        } finally {
+            stopServices();
+        }
+    }
+
+    public void test_10_1_1() throws Exception {
+        producerCount = FANIN_COUNT;
+        destCount = 1;
+        consumerCount = 1;
+
+        createConnections();
+
+        // Start 'em up.
+        startServices();
+        try {
+            reportRates();
+        } finally {
+            stopServices();
+        }
+    }
+
+    public void test_1_1_10() throws Exception {
+        producerCount = 1;
+        destCount = 1;
+        consumerCount = FANOUT_COUNT;
+
+        createConnections();
+
+        // Start 'em up.
+        startServices();
+        try {
+            reportRates();
+        } finally {
+            stopServices();
+        }
+    }
+
+    public void test_2_2_2() throws Exception {
+        producerCount = 2;
+        destCount = 2;
+        consumerCount = 2;
+
+        createConnections();
+
+        // Start 'em up.
+        startServices();
+        try {
+            reportRates();
+        } finally {
+            stopServices();
+        }
+    }
+
+    /**
+     * Tests 2 producers sending to 1 destination with 2 consumres, but with
+     * consumers set to select only messages from each producer. 1 consumers is
+     * set to slow, the other producer should be able to send quickly.
+     * 
+     * @throws Exception
+     */
+    public void test_2_2_2_SlowConsumer() throws Exception {
+        producerCount = 2;
+        destCount = 2;
+        consumerCount = 2;
+
+        createConnections();
+        consumers.get(0).setThinkTime(50);
+
+        // Start 'em up.
+        startServices();
+        try {
+            reportRates();
+        } finally {
+            stopServices();
+        }
+    }
+
+    public void test_2_2_2_Selector() throws Exception {
+        producerCount = 2;
+        destCount = 2;
+        consumerCount = 2;
+
+        createConnections();
+
+        // Add properties to match producers to their consumers
+        for (int i = 0; i < consumerCount; i++) {
+            String property = "match" + i;
+            consumers.get(i).setSelector(property);
+            producers.get(i).setProperty(property);
+        }
+
+        // Start 'em up.
+        startServices();
+        try {
+            reportRates();
+        } finally {
+            stopServices();
+        }
+    }
+
+    /**
+     * Test sending with 1 high priority sender. The high priority sender should
+     * have higher throughput than the other low priority senders.
+     * 
+     * @throws Exception
+     */
+    public void test_2_1_1_HighPriorityProducer() throws Exception {
+
+        producerCount = 2;
+        destCount = 1;
+        consumerCount = 1;
+
+        createConnections();
+        RemoteProducer producer = producers.get(0);
+        producer.setPriority(1);
+        producer.getRate().setName("High Priority Producer Rate");
+
+        consumers.get(0).setThinkTime(1);
+
+        // Start 'em up.
+        startServices();
+        try {
+
+            System.out.println("Checking rates for test: " + getName());
+            for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
+                Period p = new Period();
+                Thread.sleep(1000 * 5);
+                System.out.println(producer.getRate().getRateSummary(p));
+                System.out.println(totalProducerRate.getRateSummary(p));
+                System.out.println(totalConsumerRate.getRateSummary(p));
+                totalProducerRate.reset();
+                totalConsumerRate.reset();
+            }
+
+        } finally {
+            stopServices();
+        }
+    }
+
+    /**
+     * Test sending with 1 high priority sender. The high priority sender should
+     * have higher throughput than the other low priority senders.
+     * 
+     * @throws Exception
+     */
+    public void test_2_1_1_MixedHighPriorityProducer() throws Exception {
+        producerCount = 2;
+        destCount = 1;
+        consumerCount = 1;
+
+        createConnections();
+        RemoteProducer producer = producers.get(0);
+        producer.setPriority(1);
+        producer.setPriorityMod(3);
+        producer.getRate().setName("High Priority Producer Rate");
+
+        consumers.get(0).setThinkTime(1);
+
+        // Start 'em up.
+        startServices();
+        try {
+
+            System.out.println("Checking rates for test: " + getName());
+            for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
+                Period p = new Period();
+                Thread.sleep(1000 * 5);
+                System.out.println(producer.getRate().getRateSummary(p));
+                System.out.println(totalProducerRate.getRateSummary(p));
+                System.out.println(totalConsumerRate.getRateSummary(p));
+                totalProducerRate.reset();
+                totalConsumerRate.reset();
+            }
+
+        } finally {
+            stopServices();
+        }
+    }
+
+    private void reportRates() throws InterruptedException {
+        System.out.println("Checking rates for test: " + getName() + ", " + (ptp ? "ptp" : "topic"));
+        for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
+            Period p = new Period();
+            Thread.sleep(1000 * 5);
+            System.out.println(totalProducerRate.getRateSummary(p));
+            System.out.println(totalConsumerRate.getRateSummary(p));
+            totalProducerRate.reset();
+            totalConsumerRate.reset();
+        }
+    }
+
+    private void createConnections() throws IOException, URISyntaxException {
+
+        if (multibroker) {
+            sendBroker = createBroker("SendBroker", sendBrokerURI);
+            rcvBroker = createBroker("RcvBroker", receiveBrokerURI);
+            brokers.add(sendBroker);
+            brokers.add(rcvBroker);
+        } else {
+            sendBroker = rcvBroker = createBroker("Broker", sendBrokerURI);
+            brokers.add(sendBroker);
+        }
+
+        Destination[] dests = new Destination[destCount];
+
+        for (int i = 0; i < destCount; i++) {
+            Destination.SingleDestination bean = new Destination.SingleDestination();
+            bean.setName(new AsciiBuffer("dest" + (i + 1)));
+            bean.setDomain(ptp ? Router.QUEUE_DOMAIN : Router.TOPIC_DOMAIN);
+            dests[i] = bean;
+            if (ptp) {
+                Queue queue = createQueue(sendBroker, dests[i]);
+                sendBroker.addQueue(queue);
+                if (multibroker) {
+                    queue = createQueue(rcvBroker, dests[i]);
+                    rcvBroker.addQueue(queue);
+                }
+            }
+        }
+
+        for (int i = 0; i < producerCount; i++) {
+            Destination destination = dests[i % destCount];
+            RemoteProducer producer = createProducer(i, destination);
+            producers.add(producer);
+        }
+
+        for (int i = 0; i < consumerCount; i++) {
+            Destination destination = dests[i % destCount];
+            RemoteConsumer consumer = createConsumer(i, destination);
+            consumers.add(consumer);
+        }
+
+        // Create MultiBroker connections:
+        // if (multibroker) {
+        // Pipe<Message> pipe = new Pipe<Message>();
+        // sendBroker.createBrokerConnection(rcvBroker, pipe);
+        // rcvBroker.createBrokerConnection(sendBroker, pipe.connect());
+        // }
+    }
+
+    private RemoteConsumer createConsumer(int i, Destination destination) {
+        RemoteConsumer consumer = new RemoteConsumer();
+        consumer.setUri(rcvBroker.getConnectURI());
+        consumer.setDestination(destination);
+        consumer.setName("consumer" + (i + 1));
+        consumer.setTotalConsumerRate(totalConsumerRate);
+        consumer.setDispatcher(dispatcher);
+        return consumer;
+    }
+
+    private RemoteProducer createProducer(int id, Destination destination) {
+        RemoteProducer producer = new RemoteProducer();
+        producer.setUri(sendBroker.getConnectURI());
+        producer.setProducerId(id + 1);
+        producer.setName("producer" + (id + 1));
+        producer.setDestination(destination);
+        producer.setMessageIdGenerator(msgIdGenerator);
+        producer.setTotalProducerRate(totalProducerRate);
+        producer.setDispatcher(dispatcher);
+        return producer;
+    }
+
+    private Queue createQueue(Broker broker, Destination destination) {
+        Queue queue = new Queue();
+        queue.setBroker(broker);
+        queue.setDestination(destination);
+        queue.setKeyExtractor(KEY_MAPPER);
+        if (usePartitionedQueue) {
+            queue.setPartitionMapper(PARTITION_MAPPER);
+        }
+        return queue;
+    }
+
+    private Broker createBroker(String name, String uri) {
+        Broker broker = new Broker();
+        broker.setName(name);
+        broker.setUri(uri);
+        broker.setDispatcher(dispatcher);
+        return broker;
+    }
+
+    private void stopServices() throws Exception {
+        for (RemoteProducer connection : producers) {
+            connection.stop();
+        }
+        for (RemoteConsumer connection : consumers) {
+            connection.stop();
+        }
+        for (Broker broker : brokers) {
+            broker.stop();
+        }
+        if (dispatcher != null) {
+            dispatcher.shutdown();
+        }
+    }
+
+    private void startServices() throws Exception {
+        for (Broker broker : brokers) {
+            broker.start();
+        }
+        for (RemoteConsumer connection : consumers) {
+            connection.start();
+        }
+
+        for (RemoteProducer connection : producers) {
+            connection.start();
+        }
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenWireSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenWireSupport.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenWireSupport.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenWireSupport.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,83 @@
+package org.apache.activemq.broker.openwire;
+
+import javax.jms.MessageNotWriteableException;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.SessionInfo;
+
+public class OpenWireSupport {
+    
+    static private long idGenerator;
+    static private int msgIdGenerator;
+    static private int txGenerator;
+    static private int tempDestGenerator;
+
+    public static ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, ActiveMQDestination destination) throws Exception {
+        ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator);
+        info.setBrowser(false);
+        info.setDestination(destination);
+        info.setPrefetchSize(1000);
+        info.setDispatchAsync(false);
+        return info;
+    }
+
+    public static RemoveInfo closeConsumerInfo(ConsumerInfo consumerInfo) {
+        return consumerInfo.createRemoveCommand();
+    }
+
+    public static ProducerInfo createProducerInfo(SessionInfo sessionInfo) throws Exception {
+        ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator);
+        return info;
+    }
+
+    public static SessionInfo createSessionInfo(ConnectionInfo connectionInfo) throws Exception {
+        SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator);
+        return info;
+    }
+
+    public static ConnectionInfo createConnectionInfo() throws Exception {
+        ConnectionInfo info = new ConnectionInfo();
+        info.setConnectionId(new ConnectionId("connection:" + (++idGenerator)));
+        info.setClientId(info.getConnectionId().getValue());
+        return info;
+    }
+
+    public static ActiveMQTextMessage createMessage(ProducerInfo producerInfo, ActiveMQDestination destination) {
+        return createMessage(producerInfo, destination, 4, null);
+    }
+    
+    public static ActiveMQTextMessage createMessage(ProducerInfo producerInfo, ActiveMQDestination destination, int priority, String payload) {
+        ActiveMQTextMessage message = new ActiveMQTextMessage();
+        message.setJMSPriority(priority);
+        message.setMessageId(new MessageId(producerInfo, ++msgIdGenerator));
+        message.setDestination(destination);
+        message.setPersistent(false);
+        if( payload!=null ) {
+            try {
+                message.setText(payload);
+            } catch (MessageNotWriteableException e) {
+            }
+        }
+        return message;
+    }
+
+    public static MessageAck createAck(ConsumerInfo consumerInfo, Message msg, int count, byte ackType) {
+        MessageAck ack = new MessageAck();
+        ack.setAckType(ackType);
+        ack.setConsumerId(consumerInfo.getConsumerId());
+        ack.setDestination(msg.getDestination());
+        ack.setLastMessageId(msg.getMessageId());
+        ack.setMessageCount(count);
+        return ack;
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,169 @@
+package org.apache.activemq.broker.openwire;
+
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.Connection;
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.FlowController;
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.IFlowSource;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.SizeLimiter;
+import org.apache.activemq.flow.ISinkController.FlowControllable;
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.MetricCounter;
+import org.apache.activemq.transport.DispatchableTransport;
+import org.apache.activemq.transport.TransportFactory;
+
+public class RemoteConsumer extends Connection {
+
+    private final MetricCounter consumerRate = new MetricCounter();
+
+    private MetricAggregator totalConsumerRate;
+    private long thinkTime;
+    private Destination destination;
+    private String selector;
+    private URI uri;
+
+    private boolean schedualWait;
+
+    protected final Object inboundMutex = new Object();
+    private FlowController<MessageDelivery> inboundController;
+    
+    public void start() throws Exception {
+        consumerRate.name("Consumer " + name + " Rate");
+        totalConsumerRate.add(consumerRate);
+
+        transport = TransportFactory.compositeConnect(uri);
+        if(transport instanceof DispatchableTransport)
+        {
+            DispatchableTransport dt = ((DispatchableTransport)transport);
+            dt.setName(name + "-client-transport");
+            dt.setDispatcher(getDispatcher());
+            schedualWait = true;
+        }
+        transport.setTransportListener(this);
+        transport.start();
+        
+        // Let the remote side know our name.
+        transport.oneway(name);
+        // Sending the destination acts as the subscribe.
+        transport.oneway(destination);
+        super.initialize();
+    }
+    
+    protected void initialize() {
+        
+        // Setup the input processing..
+        Flow flow = new Flow(name, false);
+        SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(inputWindowSize, inputResumeThreshold);
+        inboundController = new FlowController<MessageDelivery>(new FlowControllable<MessageDelivery>() {
+            public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
+                messageReceived(controller, elem);
+            }
+            public String toString() {
+                return name;
+            }
+            public IFlowSink<MessageDelivery> getFlowSink() {
+                return null;
+            }
+            public IFlowSource<MessageDelivery> getFlowSource() {
+                return null;
+            }
+        }, flow, limiter, inboundMutex);
+    }
+    
+    public void onCommand(Object command) {
+        try {
+            if (command.getClass() == MessageDelivery.class) {
+                MessageDelivery msg = (MessageDelivery) command;
+                inboundController.add(msg, null);
+            } else {
+                onException(new Exception("Unrecognized command: " + command));
+            }
+        } catch (Exception e) {
+            onException(e);
+        }
+    }
+    
+    protected void messageReceived(final ISourceController<MessageDelivery> controller, final MessageDelivery elem) {
+        if( schedualWait ) {
+            if (thinkTime > 0) {
+                getDispatcher().schedule(new Runnable(){
+
+                    public void run() {
+                        consumerRate.increment();
+                        controller.elementDispatched(elem);
+                    }
+                    
+                }, thinkTime, TimeUnit.MILLISECONDS);
+                
+            }
+            else
+            {
+                consumerRate.increment();
+                controller.elementDispatched(elem);
+            }
+
+        } else {
+            if( thinkTime>0 ) {
+                try {
+                    Thread.sleep(thinkTime);
+                } catch (InterruptedException e) {
+                }
+            }
+            consumerRate.increment();
+            controller.elementDispatched(elem);
+        }
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public MetricAggregator getTotalConsumerRate() {
+        return totalConsumerRate;
+    }
+
+    public void setTotalConsumerRate(MetricAggregator totalConsumerRate) {
+        this.totalConsumerRate = totalConsumerRate;
+    }
+
+    public Destination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(Destination destination) {
+        this.destination = destination;
+    }
+
+    public long getThinkTime() {
+        return thinkTime;
+    }
+
+    public void setThinkTime(long thinkTime) {
+        this.thinkTime = thinkTime;
+    }
+
+    public MetricCounter getConsumerRate() {
+        return consumerRate;
+    }
+
+    public String getSelector() {
+        return selector;
+    }
+
+    public void setSelector(String selector) {
+        this.selector = selector;
+    }
+
+    public URI getUri() {
+        return uri;
+    }
+
+    public void setUri(URI uri) {
+        this.uri = uri;
+    }}