You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/12 19:04:50 UTC

svn commit: r752957 [2/2] - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/dispatch/ main/java/org/apache/activemq/flow/ main/java/org/apache/activemq/queue/ main/java/org/apache/activemq/transport/ test/java/org/apache/activemq/...

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java Thu Mar 12 18:04:49 2009
@@ -16,10 +16,7 @@
  */
 package org.apache.activemq.flow;
 
-import java.io.IOException;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicLong;
 
 import junit.framework.TestCase;
 
@@ -28,21 +25,19 @@
 import org.apache.activemq.flow.Commands.Destination;
 import org.apache.activemq.flow.Commands.Destination.DestinationBean;
 import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
-import org.apache.activemq.metric.MetricAggregator;
-import org.apache.activemq.metric.Period;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.queue.Mapper;
 
 public class MockBrokerTest extends TestCase {
 
-    protected static final int PERFORMANCE_SAMPLES = 3;
+    protected static final int PERFORMANCE_SAMPLES = 5;
+    protected static final int SAMPLING_FREQUENCY = 5;
 
-    protected static final int IO_WORK_AMOUNT = 0;
     protected static final int FANIN_COUNT = 10;
     protected static final int FANOUT_COUNT = 10;
 
     protected static final int PRIORITY_LEVELS = 10;
-    protected static final boolean USE_INPUT_QUEUES = true;
+    protected static final boolean USE_INPUT_QUEUES = false;
 
     // Set to put senders and consumers on separate brokers.
     protected boolean multibroker = false;
@@ -59,21 +54,16 @@
     protected String receiveBrokerURI;
 
     // Set's the number of threads to use:
-    protected final int asyncThreadPoolSize = Runtime.getRuntime().availableProcessors();
+    protected static final boolean SEPARATE_CLIENT_DISPATCHER = false;
+    protected final int threadsPerDispatcher = Runtime.getRuntime().availableProcessors();
     protected boolean usePartitionedQueue = false;
 
-    protected int producerCount;
-    protected int consumerCount;
-    protected int destCount;
-
-    protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items");
-    protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items");
-
+    protected ArrayList<MockBroker> brokers = new ArrayList<MockBroker>();
     protected MockBroker sendBroker;
     protected MockBroker rcvBroker;
-    protected ArrayList<MockBroker> brokers = new ArrayList<MockBroker>();
+    protected MockClient client;
+
     protected IDispatcher dispatcher;
-    protected final AtomicLong msgIdGenerator = new AtomicLong();
 
     static public final Mapper<Long, Message> KEY_MAPPER = new Mapper<Long, Message>() {
         public Long map(Message element) {
@@ -90,8 +80,6 @@
 
     @Override
     protected void setUp() throws Exception {
-        dispatcher = createDispatcher();
-        dispatcher.start();
         if (tcp) {
             sendBrokerURI = "tcp://localhost:10000?wireFormat=proto";
             receiveBrokerURI = "tcp://localhost:20000?wireFormat=proto";
@@ -106,119 +94,79 @@
         }
     }
 
-    protected IDispatcher createDispatcher() {
-        return PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", Message.MAX_PRIORITY, asyncThreadPoolSize);
+    protected IDispatcher createDispatcher(String name) {
+        return PriorityDispatcher.createPriorityDispatchPool(name, Message.MAX_PRIORITY, threadsPerDispatcher);
     }
-    
+
     public void test_1_1_0() throws Exception {
-        producerCount = 1;
-        destCount = 1;
 
-        createConnections();
+        client = new MockClient();
+        client.setNumProducers(1);
+        client.setDestCount(1);
+        client.setNumConsumers(0);
 
-        // Start 'em up.
-        startServices();
-        try {
-            reportRates();
-        } finally {
-            stopServices();
-        }
+        createConnections(1);
+        runTestCase();
     }
 
     public void test_1_1_1() throws Exception {
-        producerCount = 1;
-        destCount = 1;
-        consumerCount = 1;
-
-        createConnections();
+        client = new MockClient();
+        client.setNumProducers(1);
+        client.setDestCount(1);
+        client.setNumConsumers(1);
 
-        // Start 'em up.
-        startServices();
-        try {
-            reportRates();
-        } finally {
-            stopServices();
-        }
+        createConnections(1);
+        runTestCase();
     }
 
     public void test_10_10_10() throws Exception {
-        producerCount = FANIN_COUNT;
-        destCount = FANIN_COUNT;
-        consumerCount = FANOUT_COUNT;
-
-        createConnections();
+        client = new MockClient();
+        client.setNumProducers(FANIN_COUNT);
+        client.setDestCount(FANIN_COUNT);
+        client.setNumConsumers(FANOUT_COUNT);
 
-        // Start 'em up.
-        startServices();
-        try {
-            reportRates();
-        } finally {
-            stopServices();
-        }
+        createConnections(FANIN_COUNT);
+        runTestCase();
     }
 
     public void test_10_1_10() throws Exception {
-        producerCount = FANIN_COUNT;
-        consumerCount = FANOUT_COUNT;
-        destCount = 1;
+        client = new MockClient();
+        client.setNumProducers(FANIN_COUNT);
+        client.setDestCount(1);
+        client.setNumConsumers(FANOUT_COUNT);
 
-        createConnections();
-
-        // Start 'em up.
-        startServices();
-        try {
-            reportRates();
-        } finally {
-            stopServices();
-        }
+        createConnections(1);
+        runTestCase();
     }
 
     public void test_10_1_1() throws Exception {
-        producerCount = FANIN_COUNT;
-        destCount = 1;
-        consumerCount = 1;
+        client = new MockClient();
+        client.setNumProducers(FANIN_COUNT);
+        client.setDestCount(1);
+        client.setNumConsumers(1);
 
-        createConnections();
-
-        // Start 'em up.
-        startServices();
-        try {
-            reportRates();
-        } finally {
-            stopServices();
-        }
+        createConnections(1);
+        runTestCase();
     }
 
     public void test_1_1_10() throws Exception {
-        producerCount = 1;
-        destCount = 1;
-        consumerCount = FANOUT_COUNT;
-
-        createConnections();
+        client = new MockClient();
+        client.setNumProducers(1);
+        client.setDestCount(1);
+        client.setNumConsumers(FANOUT_COUNT);
 
-        // Start 'em up.
-        startServices();
-        try {
-            reportRates();
-        } finally {
-            stopServices();
-        }
+        createConnections(1);
+        runTestCase();
     }
 
     public void test_2_2_2() throws Exception {
-        producerCount = 2;
-        destCount = 2;
-        consumerCount = 2;
-
-        createConnections();
+        client = new MockClient();
+        client.setNumProducers(2);
+        client.setDestCount(2);
+        client.setNumConsumers(2);
 
-        // Start 'em up.
-        startServices();
-        try {
-            reportRates();
-        } finally {
-            stopServices();
-        }
+        createConnections(2);
+        runTestCase();
     }
 
     /**
@@ -229,43 +177,33 @@
      * @throws Exception
      */
     public void test_2_2_2_SlowConsumer() throws Exception {
-        producerCount = 2;
-        destCount = 2;
-        consumerCount = 2;
+        client = new MockClient();
+        client.setNumProducers(2);
+        client.setDestCount(2);
+        client.setNumConsumers(2);
+
+        createConnections(2);
+        client.consumer(0).setThinkTime(50);
+        runTestCase();
 
-        createConnections();
-        rcvBroker.consumers.get(0).setThinkTime(50);
-
-        // Start 'em up.
-        startServices();
-        try {
-            reportRates();
-        } finally {
-            stopServices();
-        }
     }
 
     public void test_2_2_2_Selector() throws Exception {
-        producerCount = 2;
-        destCount = 2;
-        consumerCount = 2;
+        client = new MockClient();
+        client.setNumProducers(2);
+        client.setDestCount(2);
+        client.setNumConsumers(2);
 
-        createConnections();
+        createConnections(2);
 
         // Add properties to match producers to their consumers
-        for (int i = 0; i < consumerCount; i++) {
+        for (int i = 0; i < 2; i++) {
             String property = "match" + i;
-            rcvBroker.consumers.get(i).setSelector(property);
-            sendBroker.producers.get(i).setProperty(property);
+            client.consumer(i).setSelector(property);
+            client.producer(i).setProperty(property);
         }
 
-        // Start 'em up.
-        startServices();
-        try {
-            reportRates();
-        } finally {
-            stopServices();
-        }
+        runTestCase();
     }
 
     /**
@@ -276,35 +214,20 @@
      */
     public void test_2_1_1_HighPriorityProducer() throws Exception {
 
-        producerCount = 2;
-        destCount = 1;
-        consumerCount = 1;
-
-        createConnections();
-        RemoteProducer producer = sendBroker.producers.get(0);
+        client = new MockClient();
+        client.setNumProducers(2);
+        client.setNumConsumers(1);
+        client.setDestCount(1);
+
+        createConnections(1);
+        RemoteProducer producer = client.producer(0);
+        client.includeInRateReport(producer);
         producer.setPriority(1);
         producer.getRate().setName("High Priority Producer Rate");
 
-        rcvBroker.consumers.get(0).setThinkTime(1);
+        client.consumer(0).setThinkTime(1);
 
-        // Start 'em up.
-        startServices();
-        try {
-
-            System.out.println("Checking rates for test: " + getName());
-            for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
-                Period p = new Period();
-                Thread.sleep(1000 * 5);
-                System.out.println(producer.getRate().getRateSummary(p));
-                System.out.println(totalProducerRate.getRateSummary(p));
-                System.out.println(totalConsumerRate.getRateSummary(p));
-                totalProducerRate.reset();
-                totalConsumerRate.reset();
-            }
-
-        } finally {
-            stopServices();
-        }
+        runTestCase();
     }
 
     /**
@@ -314,53 +237,26 @@
      * @throws Exception
      */
     public void test_2_1_1_MixedHighPriorityProducer() throws Exception {
-        producerCount = 2;
-        destCount = 1;
-        consumerCount = 1;
+        client = new MockClient();
+        client.setNumProducers(2);
+        client.setNumConsumers(1);
+        client.setDestCount(1);
 
-        createConnections();
-        RemoteProducer producer = sendBroker.producers.get(0);
+        createConnections(1);
+        RemoteProducer producer = client.producer(0);
         producer.setPriority(1);
         producer.setPriorityMod(3);
         producer.getRate().setName("High Priority Producer Rate");
 
-        rcvBroker.consumers.get(0).setThinkTime(1);
-
-        // Start 'em up.
-        startServices();
-        try {
-
-            System.out.println("Checking rates for test: " + getName());
-            for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
-                Period p = new Period();
-                Thread.sleep(1000 * 5);
-                System.out.println(producer.getRate().getRateSummary(p));
-                System.out.println(totalProducerRate.getRateSummary(p));
-                System.out.println(totalConsumerRate.getRateSummary(p));
-                totalProducerRate.reset();
-                totalConsumerRate.reset();
-            }
-
-        } finally {
-            stopServices();
-        }
+        client.consumer(0).setThinkTime(1);
+        runTestCase();
     }
 
-    private void reportRates() throws InterruptedException {
-        System.out.println("Checking rates for test: " + getName() + ", " + (ptp ? "ptp" : "topic"));
-        for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
-            Period p = new Period();
-            Thread.sleep(1000 * 5);
-            System.out.println(totalProducerRate.getRateSummary(p));
-            System.out.println(totalConsumerRate.getRateSummary(p));
-            totalProducerRate.reset();
-            totalConsumerRate.reset();
-        }
-    }
+    private void createConnections(int destCount) throws Exception {
 
-    private void createConnections() throws IOException, URISyntaxException {
+        dispatcher = createDispatcher("BrokerDispatcher");
+        dispatcher.start();
 
-        FlowController.setFlowExecutor(dispatcher.createPriorityExecutor(Message.MAX_PRIORITY));
         if (multibroker) {
             sendBroker = createBroker("SendBroker", sendBrokerURI);
             rcvBroker = createBroker("RcvBroker", receiveBrokerURI);
@@ -388,46 +284,27 @@
             }
         }
 
-        for (int i = 0; i < producerCount; i++) {
-            Destination destination = dests[i % destCount];
-            RemoteProducer producer = createProducer(i, destination);
-            sendBroker.producers.add(producer);
-        }
-
-        for (int i = 0; i < consumerCount; i++) {
-            Destination destination = dests[i % destCount];
-            RemoteConsumer consumer = createConsumer(i, destination);
-            sendBroker.consumers.add(consumer);
-        }
-
-        // Create MultiBroker connections:
-        // if (multibroker) {
-        // Pipe<Message> pipe = new Pipe<Message>();
-        // sendBroker.createBrokerConnection(rcvBroker, pipe);
-        // rcvBroker.createBrokerConnection(sendBroker, pipe.connect());
-        // }
-    }
-
-    private RemoteConsumer createConsumer(int i, Destination destination) {
-        RemoteConsumer consumer = new RemoteConsumer();
-        consumer.setBroker(rcvBroker);
-        consumer.setDestination(destination);
-        consumer.setName("consumer" + (i + 1));
-        consumer.setTotalConsumerRate(totalConsumerRate);
-        consumer.setDispatcher(dispatcher);
-        return consumer;
-    }
-
-    private RemoteProducer createProducer(int id, Destination destination) {
-        RemoteProducer producer = new RemoteProducer();
-        producer.setBroker(sendBroker);
-        producer.setProducerId(id + 1);
-        producer.setName("producer" + (id + 1));
-        producer.setDestination(destination);
-        producer.setMessageIdGenerator(msgIdGenerator);
-        producer.setTotalProducerRate(totalProducerRate);
-        producer.setDispatcher(dispatcher);
-        return producer;
+        IDispatcher clientDispatcher = null;
+        if (SEPARATE_CLIENT_DISPATCHER) {
+            clientDispatcher = createDispatcher("ClientDispatcher");
+            clientDispatcher.start();
+        } else {
+            clientDispatcher = dispatcher;
+        }
+
+        // Configure Client:
+        client.setDispatcher(clientDispatcher);
+        client.setNumPriorities(PRIORITY_LEVELS);
+        client.setSendBrokerURI(sendBroker.getUri());
+        client.setReceiveBrokerURI(rcvBroker.getUri());
+        client.setPerformanceSamples(PERFORMANCE_SAMPLES);
+        client.setSamplingFrequency(1000 * SAMPLING_FREQUENCY);
+        client.setThreadsPerDispatcher(threadsPerDispatcher);
+        client.setUseInputQueues(USE_INPUT_QUEUES);
+        client.setPtp(ptp);
+        client.setTestName(getName());
+
+        client.createConnections();
     }
 
     private MockQueue createQueue(MockBroker broker, Destination destination) {
@@ -446,23 +323,37 @@
         broker.setName(name);
         broker.setUri(uri);
         broker.setDispatcher(dispatcher);
+        broker.setUseInputQueues(USE_INPUT_QUEUES);
         return broker;
     }
 
+    private void runTestCase() throws Exception {
+        // Start 'em up.
+        startServices();
+        try {
+            client.runTest();
+        } finally {
+            stopServices();
+        }
+    }
+
     private void stopServices() throws Exception {
+
         for (MockBroker broker : brokers) {
             broker.stopServices();
         }
+
+        client.getDispatcher().shutdown();
         if (dispatcher != null) {
             dispatcher.shutdown();
         }
     }
 
     private void startServices() throws Exception {
+
         for (MockBroker broker : brokers) {
             broker.startServices();
         }
-        //SelectorManager.SINGLETON.setChannelExecutor(dispatcher.createPriorityExecutor(PRIORITY_LEVELS));
     }
 
 }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java Thu Mar 12 18:04:49 2009
@@ -87,7 +87,7 @@
         public void oneway(Object command) throws IOException {
 
             try {
-                if( wireFormat!=null ) {
+                if (wireFormat != null) {
                     pipe.write(wireFormat.marshal(command));
                 } else {
                     pipe.write(command);
@@ -110,11 +110,12 @@
                         pipe.setReadReadyListener(this);
                         return true;
                     } else {
-                        if( wireFormat!=null ) {
-                            listener.onCommand(wireFormat.unmarshal((ByteSequence)o));
+                        if (wireFormat != null) {
+                            listener.onCommand(wireFormat.unmarshal((ByteSequence) o));
                         } else {
                             listener.onCommand(o);
                         }
+                        return false;
                     }
                 } catch (IOException e) {
                     listener.onException(e);
@@ -192,6 +193,10 @@
         public void setWireFormat(WireFormat wireFormat) {
             this.wireFormat = wireFormat;
         }
+
+        public void setDispatchPriority(int priority) {
+            readContext.updatePriority(priority);
+        }
     }
 
     private class PipeTransportServer implements TransportServer {
@@ -243,7 +248,7 @@
             rc.setRemoteAddress(remoteAddress);
             PipeTransport serverSide = new PipeTransport(pipe.connect());
             serverSide.setRemoteAddress(remoteAddress);
-            if( wireFormatFactory!=null ) {
+            if (wireFormatFactory != null) {
                 rc.setWireFormat(wireFormatFactory.createWireFormat());
                 serverSide.setWireFormat(wireFormatFactory.createWireFormat());
             }
@@ -268,7 +273,7 @@
             PipeTransportServer server = new PipeTransportServer();
             server.setConnectURI(uri);
             server.setName(node);
-            if( options.containsKey("wireFormat") ) {
+            if (options.containsKey("wireFormat")) {
                 server.setWireFormatFactory(createWireFormatFactory(options));
             }
             servers.put(node, server);

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java Thu Mar 12 18:04:49 2009
@@ -1,336 +0,0 @@
-package org.apache.activemq.flow;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.activemq.dispatch.IDispatcher;
-import org.apache.activemq.flow.Commands.Destination;
-import org.apache.activemq.flow.Commands.FlowControl;
-import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
-import org.apache.activemq.flow.Commands.FlowControl.FlowControlBean;
-import org.apache.activemq.flow.Commands.FlowControl.FlowControlBuffer;
-import org.apache.activemq.flow.ISinkController.FlowControllable;
-import org.apache.activemq.flow.MockBroker.DeliveryTarget;
-import org.apache.activemq.queue.SingleFlowRelay;
-import org.apache.activemq.transport.DispatchableTransport;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportListener;
-
-public class RemoteConnection implements TransportListener, DeliveryTarget {
-
-    protected Transport transport;
-    protected MockBroker broker;
-
-    protected final Object inboundMutex = new Object();
-    protected IFlowController<Message> inboundController;
-
-    protected SingleFlowRelay<Message> outputQueue;
-    protected IFlowController<Message> outboundController;
-    protected ProtocolLimiter<Message> outboundLimiter;
-    protected Flow ouboundFlow;
-
-    protected String name;
-
-    private int priorityLevels;
-
-    private final int outputWindowSize = 1000;
-    private final int outputResumeThreshold = 900;
-
-    private final int inputWindowSize = 1000;
-    private final int inputResumeThreshold = 500;
-
-    private IDispatcher dispatcher;
-    private final AtomicBoolean stopping = new AtomicBoolean();
-    protected Flow outputFlow;
-    protected boolean blockingTransport = false;
-    ExecutorService blockingWriter;
-
-    public void setBroker(MockBroker broker) {
-        this.broker = broker;
-    }
-
-    public void setTransport(Transport transport) {
-        this.transport = transport;
-    }
-
-    public void start() throws Exception {
-        transport.setTransportListener(this);
-        transport.start();
-    }
-
-    public void stop() throws Exception {
-        stopping.set(true);
-        if (transport != null) {
-            transport.stop();
-        }
-        if (blockingWriter != null) {
-            blockingWriter.shutdown();
-        }
-    }
-
-    public void onCommand(Object command) {
-        try {
-            // System.out.println("Got Command: " + command);
-            // First command in should be the name of the connection
-            if (name == null) {
-                name = (String) command;
-                initialize();
-            } else if (command.getClass() == Message.class) {
-                Message msg = (Message) command;
-                inboundController.add(msg, null);
-            } else if (command.getClass() == DestinationBuffer.class) {
-                // This is a subscription request
-                Destination destination = (Destination) command;
-
-                broker.subscribe(destination, this);
-            } else if (command.getClass() == FlowControlBuffer.class) {
-                // This is a subscription request
-                FlowControl fc = (FlowControl) command;
-                synchronized (outputQueue) {
-                    outboundLimiter.onProtocolMessage(fc);
-                }
-            } else {
-                onException(new Exception("Unrecognized command: " + command));
-            }
-        } catch (Exception e) {
-            onException(e);
-        }
-    }
-
-    protected void initialize() {
-        // Setup the input processing..
-        Flow flow = new Flow(name, false);
-        WindowLimiter<Message> limiter = new WindowLimiter<Message>(false, flow, inputWindowSize, inputResumeThreshold);
-
-        inboundController = new FlowController<Message>(new FlowControllable<Message>() {
-            public void flowElemAccepted(ISourceController<Message> controller, Message elem) {
-                messageReceived(controller, elem);
-            }
-
-            @Override
-            public String toString() {
-                return name;
-            }
-
-            public IFlowSink<Message> getFlowSink() {
-                return null;
-            }
-
-            public IFlowSource<Message> getFlowSource() {
-                return null;
-            }
-        }, flow, limiter, inboundMutex);
-
-        ouboundFlow = new Flow(name, false);
-        outboundLimiter = new WindowLimiter<Message>(true, ouboundFlow, outputWindowSize, outputResumeThreshold);
-        outputQueue = new SingleFlowRelay<Message>(ouboundFlow, name + "-outbound", outboundLimiter);
-        outboundController = outputQueue.getFlowController(ouboundFlow);
-
-        if (transport instanceof DispatchableTransport) {
-            outputQueue.setDrain(new IFlowDrain<Message>() {
-
-                public void drain(Message message, ISourceController<Message> controller) {
-                    write(message);
-                }
-            });
-
-        } else {
-            blockingTransport = true;
-            blockingWriter = Executors.newSingleThreadExecutor();
-            outputQueue.setDrain(new IFlowDrain<Message>() {
-                public void drain(final Message message, ISourceController<Message> controller) {
-                    write(message);
-                };
-            });
-            /*
-             * // Setup output processing final Executor writer =
-             * Executors.newSingleThreadExecutor(); FlowControllable<Message>
-             * controllable = new FlowControllable<Message>() { public void
-             * flowElemAccepted( final ISourceController<Message> controller,
-             * final Message elem) { writer.execute(new Runnable() { public void
-             * run() { if (!stopping.get()) { try { transport.oneway(elem);
-             * controller.elementDispatched(elem); } catch (IOException e) {
-             * onException(e); } } } }); }
-             * 
-             * public IFlowSink<Message> getFlowSink() { return null; }
-             * 
-             * public IFlowSource<Message> getFlowSource() { return null; } };
-             * 
-             * if (priorityLevels <= 1) { outboundController = new
-             * FlowController<Message>(controllable, flow, limiter,
-             * outboundMutex); } else { PrioritySizeLimiter<Message> pl = new
-             * PrioritySizeLimiter<Message>( outputWindowSize,
-             * outputResumeThreshold, priorityLevels);
-             * pl.setPriorityMapper(Message.PRIORITY_MAPPER); outboundController
-             * = new PriorityFlowController<Message>( controllable, flow, pl,
-             * outboundMutex); }
-             */
-        }
-        // outputQueue.setDispatcher(dispatcher);
-
-    }
-
-    private final void write(final Object o) {
-        synchronized (outputQueue) {
-            if (!blockingTransport) {
-                try {
-                    transport.oneway(o);
-                } catch (IOException e) {
-                    onException(e);
-                }
-            } else {
-                try {
-                    blockingWriter.execute(new Runnable() {
-                        public void run() {
-                            if (!stopping.get()) {
-                                try {
-                                    transport.oneway(o);
-                                } catch (IOException e) {
-                                    onException(e);
-                                }
-                            }
-                        }
-                    });
-                } catch (RejectedExecutionException re) {
-                    //Must be shutting down.
-                }
-            }
-        }
-    }
-
-    protected void messageReceived(ISourceController<Message> controller, Message elem) {
-        broker.router.route(controller, elem);
-        inboundController.elementDispatched(elem);
-    }
-
-    public void onException(IOException error) {
-        onException((Exception) error);
-    }
-
-    public void onException(Exception error) {
-        if (!stopping.get() && !broker.isStopping()) {
-            System.out.println("RemoteConnection error: " + error);
-            error.printStackTrace();
-        }
-    }
-
-    public void transportInterupted() {
-    }
-
-    public void transportResumed() {
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public int getPriorityLevels() {
-        return priorityLevels;
-    }
-
-    public void setPriorityLevels(int priorityLevels) {
-        this.priorityLevels = priorityLevels;
-    }
-
-    public IDispatcher getDispatcher() {
-        return dispatcher;
-    }
-
-    public void setDispatcher(IDispatcher dispatcher) {
-        this.dispatcher = dispatcher;
-        if (transport instanceof DispatchableTransport) {
-            DispatchableTransport dt = ((DispatchableTransport) transport);
-            if (name != null) {
-                dt.setName(name);
-            }
-            dt.setDispatcher(getDispatcher());
-        }
-    }
-
-    public MockBroker getBroker() {
-        return broker;
-    }
-
-    public int getOutputWindowSize() {
-        return outputWindowSize;
-    }
-
-    public int getOutputResumeThreshold() {
-        return outputResumeThreshold;
-    }
-
-    public int getInputWindowSize() {
-        return inputWindowSize;
-    }
-
-    public int getInputResumeThreshold() {
-        return inputResumeThreshold;
-    }
-
-    public IFlowSink<Message> getSink() {
-        return outputQueue;
-    }
-
-    public boolean match(Message message) {
-        return true;
-    }
-
-    private interface ProtocolLimiter<E> extends IFlowLimiter<E> {
-        public void onProtocolMessage(FlowControl m);
-    }
-
-    private class WindowLimiter<E> extends SizeLimiter<E> implements ProtocolLimiter<E> {
-        final Flow flow;
-        final boolean clientMode;
-        private int available;
-
-        public WindowLimiter(boolean clientMode, Flow flow, int capacity, int resumeThreshold) {
-            super(capacity, resumeThreshold);
-            this.clientMode = clientMode;
-            this.flow = flow;
-        }
-
-        public void reserve(E elem) {
-            super.reserve(elem);
-            if (!clientMode) {
-                // System.out.println(RemoteConnection.this.name + " Reserved "
-                // + this);
-            }
-        }
-
-        public void releaseReserved(E elem) {
-            super.reserve(elem);
-            if (!clientMode) {
-                // System.out.println(RemoteConnection.this.name +
-                // " Released Reserved " + this);
-            }
-        }
-
-        protected void remove(int size) {
-            super.remove(size);
-            if (!clientMode) {
-                available += size;
-                if (available >= capacity - resumeThreshold) {
-                    FlowControlBean fc = new FlowControlBean();
-                    fc.setCredit(available);
-                    write(fc.freeze());
-                    // System.out.println(RemoteConnection.this.name +
-                    // " Send Release " + available + this);
-                    available = 0;
-                }
-            }
-        }
-
-        public void onProtocolMessage(FlowControl m) {
-            remove(m.getCredit());
-        }
-
-        public int getElementSize(Message m) {
-            return m.getFlowLimiterSize();
-        }
-    }
-
-}

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java Thu Mar 12 18:04:49 2009
@@ -9,7 +9,7 @@
 import org.apache.activemq.transport.DispatchableTransport;
 import org.apache.activemq.transport.TransportFactory;
 
-public class RemoteConsumer extends RemoteConnection{
+public class RemoteConsumer extends ClientConnection {
 
     private final MetricCounter consumerRate = new MetricCounter();
 
@@ -18,52 +18,35 @@
     private Destination destination;
     private String selector;
 
-    private boolean schedualWait;
-    
+    private boolean schedualWait = true;
+
     public void start() throws Exception {
         consumerRate.name("Consumer " + name + " Rate");
         totalConsumerRate.add(consumerRate);
-
-        URI uri = broker.getConnectURI();
-        transport = TransportFactory.compositeConnect(uri);
-        if(transport instanceof DispatchableTransport)
-        {
-            DispatchableTransport dt = ((DispatchableTransport)transport);
-            dt.setName(name);
-            dt.setDispatcher(getDispatcher());
-            schedualWait = true;
-        }
-        transport.setTransportListener(this);
-        transport.start();
-        
-        // Let the remote side know our name.
-        transport.oneway(name);
-        // Sending the destination acts as the subscribe.
-        transport.oneway(destination);
-        super.initialize();
+        super.start();
+        // subscribe:
+        write(destination);
     }
-    
+
     protected void messageReceived(final ISourceController<Message> controller, final Message elem) {
-        if( schedualWait ) {
+        if (schedualWait) {
             if (thinkTime > 0) {
-                getDispatcher().schedule(new Runnable(){
+                getDispatcher().schedule(new Runnable() {
 
                     public void run() {
                         consumerRate.increment();
                         controller.elementDispatched(elem);
                     }
-                    
+
                 }, thinkTime, TimeUnit.MILLISECONDS);
-                
-            }
-            else
-            {
+
+            } else {
                 consumerRate.increment();
                 controller.elementDispatched(elem);
             }
 
         } else {
-            if( thinkTime>0 ) {
+            if (thinkTime > 0) {
                 try {
                     Thread.sleep(thinkTime);
                 } catch (InterruptedException e) {
@@ -74,6 +57,10 @@
         }
     }
 
+    public MetricCounter getRate() {
+        return consumerRate;
+    }
+
     public void setName(String name) {
         this.name = name;
     }
@@ -112,4 +99,5 @@
 
     public void setSelector(String selector) {
         this.selector = selector;
-    }}
+    }
+}

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java Thu Mar 12 18:04:49 2009
@@ -1,6 +1,5 @@
 package org.apache.activemq.flow;
 
-import java.net.URI;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
@@ -9,11 +8,8 @@
 import org.apache.activemq.flow.ISinkController.FlowUnblockListener;
 import org.apache.activemq.metric.MetricAggregator;
 import org.apache.activemq.metric.MetricCounter;
-import org.apache.activemq.transport.DispatchableTransport;
-import org.apache.activemq.transport.TransportFactory;
-
-public class RemoteProducer extends RemoteConnection implements Dispatchable, FlowUnblockListener<Message>{
 
+public class RemoteProducer extends ClientConnection implements Dispatchable, FlowUnblockListener<Message> {
 
     private final MetricCounter rate = new MetricCounter();
 
@@ -29,104 +25,84 @@
     private DispatchContext dispatchContext;
 
     private String filler;
-    private int payloadSize = 20;
-    
+    private int payloadSize = 0;
+    IFlowController<Message> outboundController;
+
     public void start() throws Exception {
-        
-        if( payloadSize>0 ) {
+
+        if (payloadSize > 0) {
             StringBuilder sb = new StringBuilder(payloadSize);
-            for( int i=0; i < payloadSize; ++i) {
-                sb.append((char)('a'+(i%26)));
+            for (int i = 0; i < payloadSize; ++i) {
+                sb.append((char) ('a' + (i % 26)));
             }
             filler = sb.toString();
         }
-        
+
         rate.name("Producer " + name + " Rate");
         totalProducerRate.add(rate);
 
-        URI uri = broker.getConnectURI();
-        transport = TransportFactory.compositeConnect(uri);
-        transport.setTransportListener(this);
-        if(transport instanceof DispatchableTransport)
-        {
-            DispatchableTransport dt = ((DispatchableTransport)transport);
-            dt.setName(name + "-client-transport");
-            dt.setDispatcher(getDispatcher());
-        }
-        super.setTransport(transport);
-       
-        super.initialize();
-        transport.start();
-        // Let the remote side know our name.
-        transport.oneway(name);
+        super.start();
+        outboundController = outputQueue.getFlowController(outboundFlow);
         dispatchContext = getDispatcher().register(this, name + "-client");
         dispatchContext.requestDispatch();
     }
-    
-    public void stop() throws Exception
-    {
-    	dispatchContext.close(false);
-    	super.stop();
-    }
-
-	public void onFlowUnblocked(ISinkController<Message> controller) {
-		dispatchContext.requestDispatch();
-	}
-
-	public boolean dispatch() {
-		while(true)
-		{
-			
-			if(next == null)
-			{
-	            int priority = this.priority;
-	            if (priorityMod > 0) {
-	                priority = counter % priorityMod == 0 ? 0 : priority;
-	            }
-	
-	            next = new Message(messageIdGenerator.getAndIncrement(), producerId, createPayload(), null, destination, priority);
-	            if (property != null) {
-	                next.setProperty(property);
-	            }
-			}
-	        
-			//If flow controlled stop until flow control is lifted.
-			if(outboundController.isSinkBlocked())
-			{
-				if(outboundController.addUnblockListener(this))
-				{
-					return true;
-				}
-			}
-			
-	        getSink().add(next, null);
-	        rate.increment();
-	        next = null;
-		}
-	}
+
+    public void stop() throws Exception {
+        dispatchContext.close(false);
+        super.stop();
+    }
+
+    public void onFlowUnblocked(ISinkController<Message> controller) {
+        dispatchContext.requestDispatch();
+    }
+
+    public boolean dispatch() {
+        while (true) {
+
+            if (next == null) {
+                int priority = this.priority;
+                if (priorityMod > 0) {
+                    priority = counter % priorityMod == 0 ? 0 : priority;
+                }
+
+                next = new Message(messageIdGenerator.getAndIncrement(), producerId, createPayload(), null, destination, priority);
+                if (property != null) {
+                    next.setProperty(property);
+                }
+            }
+
+            // If flow controlled stop until flow control is lifted.
+            if (outboundController.isSinkBlocked()) {
+                if (outboundController.addUnblockListener(this)) {
+                    return true;
+                }
+            }
+
+            getSink().add(next, null);
+            rate.increment();
+            next = null;
+            return false;
+        }
+    }
 
     private String createPayload() {
-        if( payloadSize>=0 ) {
+        if (payloadSize >= 0) {
             StringBuilder sb = new StringBuilder(payloadSize);
             sb.append(name);
             sb.append(':');
             sb.append(++counter);
             sb.append(':');
             int length = sb.length();
-            if( length <= payloadSize ) {
-                sb.append(filler.subSequence(0, payloadSize-length));
+            if (length <= payloadSize) {
+                sb.append(filler.subSequence(0, payloadSize - length));
                 return sb.toString();
             } else {
-               return sb.substring(0, payloadSize); 
+                return sb.substring(0, payloadSize);
             }
         } else {
-            return name+":"+(++counter);
+            return name + ":" + (++counter);
         }
     }
-	
-	public void setName(String name) {
-        this.name = name;
-    }
 
     public AtomicLong getMessageIdGenerator() {
         return messageIdGenerator;
@@ -195,5 +171,9 @@
     public void setPayloadSize(int messageSize) {
         this.payloadSize = messageSize;
     }
-}
 
+    @Override
+    protected void messageReceived(ISourceController<Message> controller, Message elem) {
+        controller.elementDispatched(elem);
+    }
+}