You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/02/12 20:05:06 UTC

svn commit: r743841 - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/flow/ test/java/org/apache/activemq/flow/

Author: chirino
Date: Thu Feb 12 19:05:05 2009
New Revision: 743841

URL: http://svn.apache.org/viewvc?rev=743841&view=rev
Log:
test case is now working against a real IO impl.

Added:
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java
Removed:
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/LocalConsumer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/LocalProducer.java
Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/TestFlowManager.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java?rev=743841&r1=743840&r2=743841&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java Thu Feb 12 19:05:05 2009
@@ -91,7 +91,7 @@
         this.flow = flow;
         this.limiter = limiter == null ? new SizeLimiter<E>(0, 0) : limiter;
         this.mutex = mutex;
-        this.name = controllable.getFlowSource().toString();
+        this.name = controllable.toString();
     }
 
     public final IFlowLimiter<E> getLimiter() {

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java?rev=743841&r1=743840&r2=743841&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java Thu Feb 12 19:05:05 2009
@@ -61,22 +61,20 @@
     public static final int BLOCKING = 0;
     public static final int POLLING = 1;
     public static final int ASYNC = 2;
-    private final int dispatchMode;
 
     AbstractTestConnection(MockBroker broker, String name, Flow flow, Pipe<Message> p) {
         this.name = name;
         this.broker = broker;
         this.flow = flow;
-        this.dispatchMode = broker.dispatchMode;
 
         // Set up an input source:
         this.input = new NetworkSource(flow, name + "-INPUT", inputWindowSize, inputResumeThreshold);
 
         // Setup output queue:
-        if (broker.priorityLevels <= 1) {
-            this.output = broker.getFlowManager().createFlowQueue(flow, name + "-OUTPUT", outputQueueSize, resumeThreshold);
+        if (MockBrokerTest.PRIORITY_LEVELS <= 1) {
+            this.output = TestFlowManager.createFlowQueue(flow, name + "-OUTPUT", outputQueueSize, resumeThreshold);
         } else {
-            ExclusivePriorityQueue<Message> t = new ExclusivePriorityQueue<Message>(broker.priorityLevels, flow, name + "-OUTPUT", outputQueueSize, resumeThreshold);
+            ExclusivePriorityQueue<Message> t = new ExclusivePriorityQueue<Message>(MockBrokerTest.PRIORITY_LEVELS, flow, name + "-OUTPUT", outputQueueSize, resumeThreshold);
             t.setPriorityMapper(Message.PRIORITY_MAPPER);
             this.output = t;
         }
@@ -130,8 +128,8 @@
     }
 
     public final void simulateEncodingWork() {
-        if (broker.ioWorkAmount > 1) {
-            fib(broker.ioWorkAmount);
+        if (MockBrokerTest.IO_WORK_AMOUNT > 1) {
+            fib(MockBrokerTest.IO_WORK_AMOUNT);
         }
     }
 
@@ -187,7 +185,7 @@
 
     public final void start() throws Exception {
         running.set(true);
-        if (dispatchMode == BLOCKING) {
+        if (MockBrokerTest.DISPATCH_MODE == BLOCKING) {
             listener = new Thread(new Runnable() {
                 public void run() {
                     try {
@@ -212,8 +210,8 @@
             }, name + "-Sender");
             sender.start();
         } else {
-            output.setDispatcher(broker.dispatcher);
-            input.setDispatcher(broker.dispatcher);
+            output.setDispatcher(broker.getDispatcher());
+            input.setDispatcher(broker.getDispatcher());
             return;
         }
 
@@ -221,7 +219,7 @@
 
     public final void stop() throws Exception {
         running.set(false);
-        if (dispatchMode == BLOCKING) {
+        if (MockBrokerTest.DISPATCH_MODE == BLOCKING) {
             listener.interrupt();
             listener.join();
             sender.interrupt();
@@ -262,16 +260,16 @@
         public NetworkSource(Flow flow, String name, int capacity, int resumeThreshold) {
             super(name);
             if (flow == null) {
-                if (broker.useInputQueues) {
-                    inputQueue = broker.getFlowManager().createFlowQueue(flow, name, capacity, resumeThreshold);
+                if (MockBrokerTest.USE_INPUT_QUEUES) {
+                    inputQueue = TestFlowManager.createFlowQueue(flow, name, capacity, resumeThreshold);
                 } else {
                     inputQueue = null;
                 }
                 flowController = null;
             } else {
-                if (broker.useInputQueues) {
-                    if (broker.priorityLevels <= 1) {
-                        inputQueue = broker.getFlowManager().createFlowQueue(flow, name, capacity, resumeThreshold);
+                if (MockBrokerTest.USE_INPUT_QUEUES) {
+                    if (MockBrokerTest.PRIORITY_LEVELS <= 1) {
+                        inputQueue = TestFlowManager.createFlowQueue(flow, name, capacity, resumeThreshold);
                     } else {
                         SingleFlowPriorityQueue<Message> t = new SingleFlowPriorityQueue<Message>(flow, name, new SizeLimiter<Message>(capacity, resumeThreshold));
                         t.setPriorityMapper(Message.PRIORITY_MAPPER);
@@ -332,7 +330,7 @@
             // priority. Note that flow control from the input queue will limit
             // dispatch
             // of lower priority messages:
-            if (broker.useInputQueues) {
+            if (MockBrokerTest.USE_INPUT_QUEUES) {
                 dispatchContext.updatePriority(Message.MAX_PRIORITY);
             }
             dispatchContext.requestDispatch();

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java?rev=743841&r1=743840&r2=743841&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java Thu Feb 12 19:05:05 2009
@@ -19,11 +19,12 @@
 import java.io.Serializable;
 import java.util.HashSet;
 
-import org.apache.activemq.flow.Flow;
 import org.apache.activemq.queue.Mapper;
 
 public class Message implements Serializable {
 
+    private static final long serialVersionUID = 6759761889075451996L;
+
     public static final Mapper<Integer, Message> PRIORITY_MAPPER = new Mapper<Integer, Message>() {
         public Integer map(Message element) {
             return element.priority;

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java?rev=743841&r1=743840&r2=743841&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java Thu Feb 12 19:05:05 2009
@@ -3,16 +3,14 @@
  */
 package org.apache.activemq.flow;
 
-import java.io.IOException;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.dispatch.IDispatcher;
 import org.apache.activemq.flow.MockBrokerTest.BrokerConnection;
 import org.apache.activemq.flow.MockBrokerTest.DeliveryTarget;
-import org.apache.activemq.flow.MockBrokerTest.Router;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportAcceptListener;
 import org.apache.activemq.transport.TransportFactory;
@@ -20,55 +18,24 @@
 
 class MockBroker implements TransportAcceptListener {
 
-    private final MockBrokerTest mockBrokerTest;
-    private final TestFlowManager flowMgr;
+    final Router router=  new Router();
     
     final ArrayList<RemoteConnection> connections = new ArrayList<RemoteConnection>();
-    final ArrayList<LocalProducer> producers = new ArrayList<LocalProducer>();
-    final ArrayList<LocalConsumer> consumers = new ArrayList<LocalConsumer>();
-    private final ArrayList<BrokerConnection> brokerConns = new ArrayList<BrokerConnection>();
-
-    private final HashMap<Destination, MockQueue> queues = new HashMap<Destination, MockQueue>();
-    final Router router;
-    private int pCount;
-    private int cCount;
-    private final String name;
-    public final int dispatchMode;
-
-    public final IDispatcher dispatcher;
-    public final int priorityLevels = MockBrokerTest.PRIORITY_LEVELS;
-    public final int ioWorkAmount = MockBrokerTest.IO_WORK_AMOUNT;
-    public final boolean useInputQueues = MockBrokerTest.USE_INPUT_QUEUES;
-    public TransportServer transportServer;
-
-    MockBroker(MockBrokerTest mockBrokerTest, String name) throws IOException, URISyntaxException {
-        this.mockBrokerTest = mockBrokerTest;
-        this.flowMgr = new TestFlowManager();
-        this.router = this.mockBrokerTest.new Router();
-        this.name = name;
-        this.dispatchMode = this.mockBrokerTest.dispatchMode;
-        this.dispatcher = this.mockBrokerTest.dispatcher;
-    }
-
-    TestFlowManager getFlowManager() {
-        return flowMgr;
-    }
+    final ArrayList<RemoteProducer> producers = new ArrayList<RemoteProducer>();
+    final ArrayList<RemoteConsumer> consumers = new ArrayList<RemoteConsumer>();
+    final ArrayList<BrokerConnection> brokerConnections = new ArrayList<BrokerConnection>();
+    final HashMap<Destination, MockQueue> queues = new HashMap<Destination, MockQueue>();
+    
+    private TransportServer transportServer;
+    private String uri;
+    private String name;
+    private IDispatcher dispatcher;
+    private final AtomicBoolean stopping = new AtomicBoolean();
 
     public String getName() {
         return name;
     }
 
-    public void createProducerConnection(Destination destination) {
-        LocalProducer c = new LocalProducer(this.mockBrokerTest, "producer" + ++pCount, this, destination);
-        producers.add(c);
-    }
-
-    public void createConsumerConnection(Destination destination) {
-        LocalConsumer c = new LocalConsumer(this.mockBrokerTest, "consumer" + ++cCount, this, destination);
-        consumers.add(c);
-        subscribe(destination, c);
-    }
-
     public void subscribe(Destination destination, DeliveryTarget deliveryTarget) {
         if (destination.ptp) {
             queues.get(destination).addConsumer(deliveryTarget);
@@ -77,43 +44,44 @@
         }
     }
 
-    public void createClusterConnection(Destination destination) {
-        LocalConsumer c = new LocalConsumer(this.mockBrokerTest, "consumer" + ++cCount, this, destination);
-        consumers.add(c);
-        router.bind(c, destination);
-    }
-
-    public void createQueue(Destination destination) {
-        MockQueue queue = new MockQueue(this.mockBrokerTest, this, destination);
-        queues.put(destination, queue);
-    }
 
-    public void createBrokerConnection(MockBroker target, Pipe<Message> pipe) {
-        BrokerConnection bc = this.mockBrokerTest.new BrokerConnection(this, target, pipe);
-        // Set up the pipe for polled access
-        if (dispatchMode != AbstractTestConnection.BLOCKING) {
-            pipe.setMode(Pipe.POLLING);
-        }
-        // Add subscriptions for the target's destinations:
-        for (Destination d : target.router.lookupTable.keySet()) {
-            router.bind(bc, d);
-        }
-        brokerConns.add(bc);
-    }
+    public void addQueue(MockQueue queue) {
+        router.bind(queue, queue.getDestination());
+        queues.put(queue.getDestination(), queue);
+    }
+
+//    public void createClusterConnection(Destination destination) {
+//        RemoteConsumer c = new RemoteConsumer(this.mockBrokerTest, "consumer" + ++consumerCounter, this, destination);
+//        consumers.add(c);
+//        router.bind(c, destination);
+//    }
+//    public void createBrokerConnection(MockBroker target, Pipe<Message> pipe) {
+//        BrokerConnection bc = this.mockBrokerTest.new BrokerConnection(this, target, pipe);
+//        // Set up the pipe for polled access
+//        if (dispatchMode != AbstractTestConnection.BLOCKING) {
+//            pipe.setMode(Pipe.POLLING);
+//        }
+//        // Add subscriptions for the target's destinations:
+//        for (Destination d : target.router.lookupTable.keySet()) {
+//            router.bind(bc, d);
+//        }
+//        brokerConns.add(bc);
+//    }
 
     final void stopServices() throws Exception {
+        stopping.set(true);
         transportServer.stop();
         
-        for (RemoteConnection connection : connections) {
+        for (RemoteProducer connection : producers) {
             connection.stop();
         }
-        for (LocalProducer connection : producers) {
+        for (RemoteConsumer connection : consumers) {
             connection.stop();
         }
-        for (LocalConsumer connection : consumers) {
+        for (RemoteConnection connection : connections) {
             connection.stop();
         }
-        for (BrokerConnection connection : brokerConns) {
+        for (BrokerConnection connection : brokerConnections) {
             connection.stop();
         }
         for (MockQueue queue : queues.values()) {
@@ -125,25 +93,25 @@
 
     final void startServices() throws Exception {
         
-        transportServer = TransportFactory.bind(new URI("tcp://localhost:61616?wireFormat=test"));
+        transportServer = TransportFactory.bind(new URI(uri));
         transportServer.setAcceptListener(this);
         transportServer.start();
         
         dispatcher.start();
 
-        for (LocalConsumer connection : consumers) {
-            connection.start();
-        }
-
         for (MockQueue queue : queues.values()) {
             queue.start();
         }
+
+        for (RemoteConsumer connection : consumers) {
+            connection.start();
+        }
         
-        for (LocalProducer connection : producers) {
+        for (RemoteProducer connection : producers) {
             connection.start();
         }
 
-        for (BrokerConnection connection : brokerConns) {
+        for (BrokerConnection connection : brokerConnections) {
             connection.start();
         }
     }
@@ -152,6 +120,8 @@
         RemoteConnection connection = new RemoteConnection();
         connection.setBroker(this);
         connection.setTransport(transport);
+        connection.setPriorityLevels(MockBrokerTest.PRIORITY_LEVELS);
+        connection.setDispatcher(dispatcher);
         try {
             connection.start();
         } catch (Exception e1) {
@@ -160,6 +130,36 @@
     }
 
     public void onAcceptError(Exception error) {
+        System.out.println("Accept error: "+error);
         error.printStackTrace();
     }
+
+    public IDispatcher getDispatcher() {
+        return dispatcher;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public void setDispatcher(IDispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+    public String getUri() {
+        return uri;
+    }
+
+    public void setUri(String uri) {
+        this.uri = uri;
+    }
+
+    public URI getConnectURI() {
+        return transportServer.getConnectURI();
+    }
+
+    public boolean isStopping() {
+        return stopping.get();
+    }
+    
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java?rev=743841&r1=743840&r2=743841&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java Thu Feb 12 19:05:05 2009
@@ -19,18 +19,12 @@
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import junit.framework.TestCase;
 
 import org.apache.activemq.dispatch.IDispatcher;
 import org.apache.activemq.dispatch.PriorityPooledDispatcher;
-import org.apache.activemq.flow.FlowController;
-import org.apache.activemq.flow.IFlowSink;
-import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.metric.MetricAggregator;
 import org.apache.activemq.metric.Period;
 import org.apache.activemq.queue.Mapper;
@@ -50,10 +44,10 @@
     private boolean multibroker = false;
 
     // Set to mockup up ptp:
-    boolean ptp = true;
+    boolean ptp = false;
 
     // Can be set to BLOCKING, POLLING or ASYNC
-    final int dispatchMode = AbstractTestConnection.ASYNC;
+    public final static int DISPATCH_MODE = AbstractTestConnection.ASYNC;
     // Set's the number of threads to use:
     private final int asyncThreadPoolSize = Runtime.getRuntime().availableProcessors();
     boolean usePartitionedQueue = false;
@@ -77,7 +71,6 @@
     }
 
     final AtomicLong msgIdGenerator = new AtomicLong();
-    final AtomicInteger prodcuerIdGenerator = new AtomicInteger();
 
     class BrokerConnection extends AbstractTestConnection implements DeliveryTarget {
         private final Pipe<Message> pipe;
@@ -148,28 +141,6 @@
         }
     };
 
-    class Router {
-        final HashMap<Destination, Collection<DeliveryTarget>> lookupTable = new HashMap<Destination, Collection<DeliveryTarget>>();
-
-        final synchronized void bind(DeliveryTarget dt, Destination destination) {
-            Collection<DeliveryTarget> targets = lookupTable.get(destination);
-            if (targets == null) {
-                targets = new ArrayList<DeliveryTarget>();
-                lookupTable.put(destination, targets);
-            }
-            targets.add(dt);
-        }
-
-        final void route(ISourceController<Message> source, Message msg) {
-            Collection<DeliveryTarget> targets = lookupTable.get(msg.getDestination());
-            for (DeliveryTarget dt : targets) {
-                if (dt.match(msg)) {
-                    dt.getSink().add(msg, source);
-                }
-            }
-        }
-    }
-
     private void reportRates() throws InterruptedException {
         System.out.println("Checking rates for test: " + getName());
         for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
@@ -195,9 +166,9 @@
         consumerCount = 1;
 
         createConnections();
-        LocalProducer producer = sendBroker.producers.get(0);
-        producer.msgPriority = 1;
-        producer.producerRate.setName("High Priority Producer Rate");
+        RemoteProducer producer = sendBroker.producers.get(0);
+        producer.setPriority(1);
+        producer.getRate().setName("High Priority Producer Rate");
 
         rcvBroker.consumers.get(0).setThinkTime(1);
 
@@ -209,7 +180,7 @@
             for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
                 Period p = new Period();
                 Thread.sleep(1000 * 5);
-                System.out.println(producer.producerRate.getRateSummary(p));
+                System.out.println(producer.getRate().getRateSummary(p));
                 System.out.println(totalProducerRate.getRateSummary(p));
                 System.out.println(totalConsumerRate.getRateSummary(p));
                 totalProducerRate.reset();
@@ -233,10 +204,10 @@
         consumerCount = 1;
 
         createConnections();
-        LocalProducer producer = sendBroker.producers.get(0);
-        producer.msgPriority = 1;
-        producer.priorityMod = 3;
-        producer.producerRate.setName("High Priority Producer Rate");
+        RemoteProducer producer = sendBroker.producers.get(0);
+        producer.setPriority(1);
+        producer.setPriorityMod(3);
+        producer.getRate().setName("High Priority Producer Rate");
 
         rcvBroker.consumers.get(0).setThinkTime(1);
 
@@ -248,7 +219,7 @@
             for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
                 Period p = new Period();
                 Thread.sleep(1000 * 5);
-                System.out.println(producer.producerRate.getRateSummary(p));
+                System.out.println(producer.getRate().getRateSummary(p));
                 System.out.println(totalProducerRate.getRateSummary(p));
                 System.out.println(totalConsumerRate.getRateSummary(p));
                 totalProducerRate.reset();
@@ -389,7 +360,9 @@
 
         // Add properties to match producers to their consumers
         for (int i = 0; i < consumerCount; i++) {
-            rcvBroker.consumers.get(i).selector = sendBroker.producers.get(i).property = "match" + i;
+            String property = "match" + i;
+            rcvBroker.consumers.get(i).setSelector(property);
+            sendBroker.producers.get(i).setProperty(property);
         }
 
         // Start 'em up.
@@ -400,21 +373,21 @@
             stopServices();
         }
     }
-
+    
     private void createConnections() throws IOException, URISyntaxException {
 
-        if (dispatchMode == AbstractTestConnection.ASYNC || dispatchMode == AbstractTestConnection.POLLING) {
+        if (DISPATCH_MODE == AbstractTestConnection.ASYNC || DISPATCH_MODE == AbstractTestConnection.POLLING) {
             dispatcher = new PriorityPooledDispatcher("BrokerDispatcher", asyncThreadPoolSize, Message.MAX_PRIORITY);
             FlowController.setFlowExecutor(dispatcher.createPriorityExecutor(Message.MAX_PRIORITY));
         }
 
         if (multibroker) {
-            sendBroker = new MockBroker(this, "SendBroker");
-            rcvBroker = new MockBroker(this, "RcvBroker");
+            sendBroker = createBroker("SendBroker", "tcp://localhost:10000?wireFormat=test");
             brokers.add(sendBroker);
+            rcvBroker = createBroker("RcvBroker", "tcp://localhost:20000?wireFormat=test");
             brokers.add(rcvBroker);
         } else {
-            sendBroker = rcvBroker = new MockBroker(this, "Broker");
+            sendBroker = rcvBroker = createBroker("Broker", "tcp://localhost:10000?wireFormat=test");
             brokers.add(sendBroker);
         }
 
@@ -423,26 +396,72 @@
         for (int i = 0; i < destCount; i++) {
             dests[i] = new Destination("dest" + (i + 1), ptp);
             if (ptp) {
-                sendBroker.createQueue(dests[i]);
+                MockQueue queue = createQueue(sendBroker, dests[i]);
+                sendBroker.addQueue(queue);
                 if (multibroker) {
-                    rcvBroker.createQueue(dests[i]);
+                    queue = createQueue(rcvBroker, dests[i]);
+                    rcvBroker.addQueue(queue);
                 }
             }
         }
 
         for (int i = 0; i < producerCount; i++) {
-            sendBroker.createProducerConnection(dests[i % destCount]);
+            Destination destination = dests[i % destCount];
+            RemoteProducer producer = createProducer(i, destination);
+            sendBroker.producers.add(producer);
         }
+        
         for (int i = 0; i < consumerCount; i++) {
-            rcvBroker.createConsumerConnection(dests[i % destCount]);
+            Destination destination = dests[i % destCount];
+            RemoteConsumer consumer = createConsumer(i, destination);
+            sendBroker.consumers.add(consumer);
         }
 
         // Create MultiBroker connections:
-        if (multibroker) {
-            Pipe<Message> pipe = new Pipe<Message>();
-            sendBroker.createBrokerConnection(rcvBroker, pipe);
-            rcvBroker.createBrokerConnection(sendBroker, pipe.connect());
-        }
+//        if (multibroker) {
+//            Pipe<Message> pipe = new Pipe<Message>();
+//            sendBroker.createBrokerConnection(rcvBroker, pipe);
+//            rcvBroker.createBrokerConnection(sendBroker, pipe.connect());
+//        }
+    }
+
+    private RemoteConsumer createConsumer(int i, Destination destination) {
+        RemoteConsumer consumer = new RemoteConsumer();
+        consumer.setBroker(rcvBroker);
+        consumer.setDestination(destination);
+        consumer.setName("consumer"+(i+1));
+        consumer.setTotalConsumerRate(totalConsumerRate);
+        return consumer;
+    }
+
+    private RemoteProducer createProducer(int id, Destination destination) {
+        RemoteProducer producer = new RemoteProducer();
+        producer.setBroker(sendBroker);
+        producer.setProducerId(id+1);
+        producer.setName("producer" +(id+1));
+        producer.setDestination(destination);
+        producer.setMessageIdGenerator(msgIdGenerator);
+        producer.setTotalProducerRate(totalProducerRate);
+        return producer;
+    }
+
+    private MockQueue createQueue(MockBroker broker, Destination destination) {
+        MockQueue queue = new MockQueue();
+        queue.setBroker(broker);
+        queue.setDestination(destination);
+        queue.setKeyExtractor(keyExtractor);
+        if( usePartitionedQueue ) {
+            queue.setPartitionMapper(partitionMapper);
+        }
+        return queue;
+    }
+
+    private MockBroker createBroker(String name, String uri) {
+        MockBroker broker = new MockBroker();
+        broker.setName(name);
+        broker.setUri(uri);
+        broker.setDispatcher(dispatcher);
+        return broker;
     }
 
     private void stopServices() throws Exception {

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java?rev=743841&r1=743840&r2=743841&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java Thu Feb 12 19:05:05 2009
@@ -7,6 +7,7 @@
 
 import org.apache.activemq.flow.MockBrokerTest.DeliveryTarget;
 import org.apache.activemq.queue.IQueue;
+import org.apache.activemq.queue.Mapper;
 import org.apache.activemq.queue.PartitionedQueue;
 import org.apache.activemq.queue.SharedPriorityQueue;
 import org.apache.activemq.queue.SharedQueue;
@@ -14,30 +15,24 @@
 
 class MockQueue implements MockBrokerTest.DeliveryTarget {
 
-    private final MockBrokerTest mockBrokerTest;
     HashMap<DeliveryTarget, Subscription<Message>> subs = new HashMap<DeliveryTarget, Subscription<Message>>();
-    private final Destination destination;
-    private final IQueue<Long, Message> queue;
-    private final MockBroker broker;
-
-    MockQueue(MockBrokerTest mockBrokerTest, MockBroker broker, Destination destination) {
-        this.mockBrokerTest = mockBrokerTest;
-        this.broker = broker;
-        this.destination = destination;
-        this.queue = createQueue();
-        broker.router.bind(this, destination);
-    }
+    private Destination destination;
+    private IQueue<Long, Message> queue;
+    private MockBroker broker;
+    
+    private Mapper<Integer, Message> partitionMapper;
+    private Mapper<Long, Message> keyExtractor;
 
     private IQueue<Long, Message> createQueue() {
 
-        if (this.mockBrokerTest.usePartitionedQueue) {
+        if (partitionMapper!=null) {
             PartitionedQueue<Integer, Long, Message> queue = new PartitionedQueue<Integer, Long, Message>() {
                 @Override
                 protected IQueue<Long, Message> cratePartition(Integer partitionKey) {
                     return createSharedFlowQueue();
                 }
             };
-            queue.setPartitionMapper(this.mockBrokerTest.partitionMapper);
+            queue.setPartitionMapper(partitionMapper);
             queue.setResourceName(destination.getName());
             return queue;
         } else {
@@ -46,33 +41,28 @@
     }
 
     private IQueue<Long, Message> createSharedFlowQueue() {
-        if (broker.priorityLevels > 1) {
-            PrioritySizeLimiter<Message> limiter = new PrioritySizeLimiter<Message>(100, 1, broker.priorityLevels);
+        if (MockBrokerTest.PRIORITY_LEVELS > 1) {
+            PrioritySizeLimiter<Message> limiter = new PrioritySizeLimiter<Message>(100, 1, MockBrokerTest.PRIORITY_LEVELS);
             limiter.setPriorityMapper(Message.PRIORITY_MAPPER);
             SharedPriorityQueue<Long, Message> queue = new SharedPriorityQueue<Long, Message>(destination.getName(), limiter);
-            queue.setKeyMapper(this.mockBrokerTest.keyExtractor);
+            queue.setKeyMapper(keyExtractor);
             queue.setAutoRelease(true);
-            queue.setDispatcher(broker.dispatcher);
+            queue.setDispatcher(broker.getDispatcher());
             return queue;
         } else {
             SizeLimiter<Message> limiter = new SizeLimiter<Message>(100, 1);
             SharedQueue<Long, Message> queue = new SharedQueue<Long, Message>(destination.getName(), limiter);
-            queue.setKeyMapper(this.mockBrokerTest.keyExtractor);
+            queue.setKeyMapper(keyExtractor);
             queue.setAutoRelease(true);
-            queue.setDispatcher(broker.dispatcher);
+            queue.setDispatcher(broker.getDispatcher());
             return queue;
         }
     }
 
     public final void deliver(ISourceController<Message> source, Message msg) {
-
         queue.add(msg, source);
     }
-
-    public String getSelector() {
-        return null;
-    }
-
+    
     public final Destination getDestination() {
         return destination;
     }
@@ -113,6 +103,7 @@
     }
 
     public void start() throws Exception {
+        queue = createQueue();
     }
 
     public void stop() throws Exception {
@@ -126,4 +117,32 @@
         return true;
     }
 
+    public MockBroker getBroker() {
+        return broker;
+    }
+
+    public void setBroker(MockBroker broker) {
+        this.broker = broker;
+    }
+
+    public Mapper<Integer, Message> getPartitionMapper() {
+        return partitionMapper;
+    }
+
+    public void setPartitionMapper(Mapper<Integer, Message> partitionMapper) {
+        this.partitionMapper = partitionMapper;
+    }
+
+    public Mapper<Long, Message> getKeyExtractor() {
+        return keyExtractor;
+    }
+
+    public void setKeyExtractor(Mapper<Long, Message> keyExtractor) {
+        this.keyExtractor = keyExtractor;
+    }
+
+    public void setDestination(Destination destination) {
+        this.destination = destination;
+    }
+
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java?rev=743841&r1=743840&r2=743841&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java Thu Feb 12 19:05:05 2009
@@ -40,7 +40,6 @@
 
     public void setBroker(MockBroker broker) {
         this.broker = broker;
-
     }
 
     public void setTransport(Transport transport) {
@@ -48,15 +47,57 @@
     }
 
     public void start() throws Exception {
+        transport.setTransportListener(this);
+        transport.start();
+    }
 
+    public void stop() throws Exception {
+        stopping.set(true);
+        writer.shutdown();
+        if (transport != null) {
+            transport.stop();
+        }
+    }
+
+    public void onCommand(Object command) {
+        try {
+            // First command in should be the name of the connection
+            if( name==null ) {
+                name = (String) command;
+                initialize();
+            } else if (command.getClass() == Message.class) {
+                Message msg = (Message) command;
+                // Use the flow controller to send the message on so that we do
+                // not overflow
+                // the broker.
+                while (!inboundController.offer(msg, null)) {
+                    inboundController.waitForFlowUnblock();
+                }
+            } else if (command.getClass() == Destination.class) {
+                // This is a subscription request
+                Destination destination = (Destination) command;
+                broker.subscribe(destination, this);
+            }
+        } catch (Exception e) {
+            onException(e);
+        }
+    }
+
+    private void initialize() {
         // Setup the input processing..
         SizeLimiter<Message> limiter = new SizeLimiter<Message>(inputWindowSize, inputResumeThreshold);
         Flow flow = new Flow(name + "-inbound", false);
         inboundController = new FlowController<Message>(new FlowControllable<Message>() {
             public void flowElemAccepted(ISourceController<Message> controller, Message elem) {
                 broker.router.route(controller, elem);
+                inboundController.elementDispatched(elem);
             }
 
+            @Override
+            public String toString() {
+                return name;
+            }
+            
             public IFlowSink<Message> getFlowSink() {
                 return null;
             }
@@ -73,7 +114,7 @@
             ExclusiveQueue<Message> queue = new ExclusiveQueue<Message>(flow, flow.getFlowName(), limiter);
             this.output = queue;
         } else {
-            ExclusivePriorityQueue<Message> t = new ExclusivePriorityQueue<Message>(broker.priorityLevels, flow, name + "-outbound", outputWindowSize, outputResumeThreshold);
+            ExclusivePriorityQueue<Message> t = new ExclusivePriorityQueue<Message>(priorityLevels, flow, name + "-outbound", outputWindowSize, outputResumeThreshold);
             t.setPriorityMapper(Message.PRIORITY_MAPPER);
             this.output = t;
         }
@@ -98,47 +139,15 @@
                 });
             }
         });
-
-        transport.setTransportListener(this);
-        transport.start();
-    }
-
-    public void stop() throws Exception {
-        stopping.set(true);
-        writer.shutdown();
-        if (transport != null) {
-            transport.stop();
-        }
-    }
-
-    public void onCommand(Object command) {
-        try {
-            if (command.getClass() == Message.class) {
-                Message msg = (Message) command;
-                // Use the flow controller to send the message on so that we do
-                // not overflow
-                // the broker.
-                while (!inboundController.offer(msg, null)) {
-                    inboundController.waitForFlowUnblock();
-                }
-            } else if (command.getClass() == Destination.class) {
-                // This is a subscription request
-                Destination destination = (Destination) command;
-                broker.subscribe(destination, this);
-            }
-        } catch (Exception e) {
-            onException(e);
-        }
     }
 
     public void onException(IOException error) {
-        if (!stopping.get()) {
-            error.printStackTrace();
-        }
+        onException((Exception)error);
     }
 
     public void onException(Exception error) {
-        if (!stopping.get()) {
+        if (!stopping.get() && !broker.isStopping()) {
+            System.out.println("RemoteConnection error: "+error);
             error.printStackTrace();
         }
     }
@@ -153,10 +162,6 @@
         return name;
     }
 
-    public void setName(String name) {
-        this.name = name;
-    }
-
     public int getPriorityLevels() {
         return priorityLevels;
     }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java?rev=743841&r1=743840&r2=743841&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java Thu Feb 12 19:05:05 2009
@@ -21,16 +21,19 @@
     private MetricAggregator totalConsumerRate;
     private long thinkTime;
     private Destination destination;
+    private String selector;
     
     public void start() throws Exception {
         consumerRate.name("Consumer " + name + " Rate");
         totalConsumerRate.add(consumerRate);
 
-        URI uri = broker.transportServer.getConnectURI();
-        transport = TransportFactory.connect(uri);
+        URI uri = broker.getConnectURI();
+        transport = TransportFactory.compositeConnect(uri);
         transport.setTransportListener(this);
         transport.start();
         
+        // Let the remote side know our name.
+        transport.oneway(name);
         // Sending the destination acts as the subscribe.
         transport.oneway(destination);
     }
@@ -61,6 +64,7 @@
 
     public void onException(IOException error) {
         if( !stopping.get() ) {
+            System.out.println("RemoteConsumer error: "+error);
             error.printStackTrace();
         }
     }
@@ -77,15 +81,6 @@
         this.broker = broker;
     }
 
-    public Transport getTransport() {
-        return transport;
-    }
-
-    public void setTransport(Transport transport) {
-        this.transport = transport;
-    }
-
-
     public MockBroker getBroker() {
         return broker;
     }
@@ -98,8 +93,8 @@
         return totalConsumerRate;
     }
 
-    public void setTotalConsumerRate(MetricAggregator totalProducerRate) {
-        this.totalConsumerRate = totalProducerRate;
+    public void setTotalConsumerRate(MetricAggregator totalConsumerRate) {
+        this.totalConsumerRate = totalConsumerRate;
     }
 
     public Destination getDestination() {
@@ -108,4 +103,24 @@
 
     public void setDestination(Destination destination) {
         this.destination = destination;
+    }
+
+    public long getThinkTime() {
+        return thinkTime;
+    }
+
+    public void setThinkTime(long thinkTime) {
+        this.thinkTime = thinkTime;
+    }
+
+    public MetricCounter getConsumerRate() {
+        return consumerRate;
+    }
+
+    public String getSelector() {
+        return selector;
+    }
+
+    public void setSelector(String selector) {
+        this.selector = selector;
     }}

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java?rev=743841&r1=743840&r2=743841&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java Thu Feb 12 19:05:05 2009
@@ -14,7 +14,7 @@
 public class RemoteProducer implements TransportListener, Runnable {
 
     private final AtomicBoolean stopping = new AtomicBoolean();
-    private final MetricCounter producerRate = new MetricCounter();
+    private final MetricCounter rate = new MetricCounter();
 
     private Transport transport;
     private MockBroker broker;
@@ -30,14 +30,17 @@
     private MetricAggregator totalProducerRate;
     
     public void start() throws Exception {
-        producerRate.name("Producer " + name + " Rate");
-        totalProducerRate.add(producerRate);
+        rate.name("Producer " + name + " Rate");
+        totalProducerRate.add(rate);
 
-        URI uri = broker.transportServer.getConnectURI();
-        transport = TransportFactory.connect(uri);
+        URI uri = broker.getConnectURI();
+        transport = TransportFactory.compositeConnect(uri);
         transport.setTransportListener(this);
         transport.start();
         
+        // Let the remote side know our name.
+        transport.oneway(name);
+
         thread = new Thread(this, name);
         thread.start();
     }
@@ -46,9 +49,9 @@
         stopping.set(true);
         if( transport!=null ) {
             transport.stop();
-            transport=null;
         }
         thread.join();
+        transport=null;
     }
 
     public void run() {
@@ -66,6 +69,7 @@
                 }
                 
                 transport.oneway(next);
+                rate.increment();
             }
         } catch (IOException e) {
             onException(e);
@@ -78,6 +82,7 @@
 
     public void onException(IOException error) {
         if( !stopping.get() ) {
+            System.out.println("RemoteProducer error: "+error);
             error.printStackTrace();
         }
     }
@@ -94,14 +99,6 @@
         this.broker = broker;
     }
 
-    public Transport getTransport() {
-        return transport;
-    }
-
-    public void setTransport(Transport transport) {
-        this.transport = transport;
-    }
-
     public AtomicLong getMessageIdGenerator() {
         return messageIdGenerator;
     }
@@ -126,14 +123,6 @@
         this.priorityMod = priorityMod;
     }
 
-    public int getCounter() {
-        return counter;
-    }
-
-    public void setCounter(int msgCounter) {
-        this.counter = msgCounter;
-    }
-
     public int getProducerId() {
         return producerId;
     }
@@ -172,4 +161,8 @@
 
     public void setTotalProducerRate(MetricAggregator totalProducerRate) {
         this.totalProducerRate = totalProducerRate;
+    }
+
+    public MetricCounter getRate() {
+        return rate;
     }}

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java?rev=743841&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java Thu Feb 12 19:05:05 2009
@@ -0,0 +1,32 @@
+/**
+ * 
+ */
+package org.apache.activemq.flow;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+
+import org.apache.activemq.flow.MockBrokerTest.DeliveryTarget;
+
+public class Router {
+    final HashMap<Destination, Collection<DeliveryTarget>> lookupTable = new HashMap<Destination, Collection<DeliveryTarget>>();
+
+    final synchronized void bind(DeliveryTarget dt, Destination destination) {
+        Collection<DeliveryTarget> targets = lookupTable.get(destination);
+        if (targets == null) {
+            targets = new ArrayList<DeliveryTarget>();
+            lookupTable.put(destination, targets);
+        }
+        targets.add(dt);
+    }
+
+    final void route(ISourceController<Message> source, Message msg) {
+        Collection<DeliveryTarget> targets = lookupTable.get(msg.getDestination());
+        for (DeliveryTarget dt : targets) {
+            if (dt.match(msg)) {
+                dt.getSink().add(msg, source);
+            }
+        }
+    }
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/TestFlowManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/TestFlowManager.java?rev=743841&r1=743840&r2=743841&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/TestFlowManager.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/TestFlowManager.java Thu Feb 12 19:05:05 2009
@@ -23,11 +23,11 @@
 import org.apache.activemq.queue.MultiFlowQueue;
 
 public class TestFlowManager {
-    synchronized <T> IFlowQueue<T> createQueue(String name, Flow flow, int capacity, int resumeThreshold) {
+    static  <T> IFlowQueue<T> createQueue(String name, Flow flow, int capacity, int resumeThreshold) {
         return createFlowQueue(flow, name, capacity, resumeThreshold);
     }
 
-    public synchronized <T> IFlowQueue<T> createFlowQueue(Flow flow, String name, int capacity, int resumeThreshold) {
+    static public <T> IFlowQueue<T> createFlowQueue(Flow flow, String name, int capacity, int resumeThreshold) {
         IFlowQueue<T> queue;
         if (flow != null) {
             queue = new ExclusiveQueue<T>(flow, name, new SizeLimiter<T>(capacity, resumeThreshold));
@@ -37,7 +37,7 @@
         return queue;
     }
 
-    public Flow createFlow(String name) {
+    static public Flow createFlow(String name) {
         Flow rc = new Flow(name, false);
         return rc;
     }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java?rev=743841&r1=743840&r2=743841&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java Thu Feb 12 19:05:05 2009
@@ -1,12 +1,12 @@
 package org.apache.activemq.flow;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.io.OutputStream;
 
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IOExceptionSupport;
@@ -15,17 +15,25 @@
 
 public class TestWireFormatFactory implements WireFormatFactory {
 
-    public class TestWireFormat implements WireFormat {
+    static public class TestWireFormat implements WireFormat {
 
         public void marshal(Object value, DataOutput out) throws IOException {
-            ObjectOutputStream oos = new ObjectOutputStream((OutputStream) out);
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutputStream oos = new ObjectOutputStream(baos);
             oos.writeObject(value);
-            oos.reset();
-            oos.flush();
+            oos.close();
+            
+            byte[] data = baos.toByteArray();
+            out.writeInt(data.length);
+            out.write(data);
         }
 
         public Object unmarshal(DataInput in) throws IOException {
-            ObjectInputStream ois = new ObjectInputStream((InputStream) in);
+            byte data[] = new byte[in.readInt()];
+            in.readFully(data);
+            
+            ByteArrayInputStream is = new ByteArrayInputStream(data);
+            ObjectInputStream ois = new ObjectInputStream(is);
             try {
                 return ois.readObject();
             } catch (ClassNotFoundException e) {
@@ -53,6 +61,6 @@
 
 	public WireFormat createWireFormat() {
 		return new TestWireFormat();
-	}
+	}	
 
 }