You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/12/17 21:21:06 UTC

svn commit: r891866 [2/2] - in /activemq/sandbox/activemq-apollo-actor: activemq-dispatcher/src/main/java/org/apache/activemq/actor/ activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/ activemq-dispatcher/src/main/java/org/apache/activemq/d...

Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBrokerTest.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBrokerTest.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBrokerTest.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.perf;
+
+import java.util.ArrayList;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.dispatch.Dispatcher;
+import org.apache.activemq.dispatch.DispatcherConfig;
+import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.flow.Commands.Destination.DestinationBean;
+import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
+import org.apache.activemq.util.Mapper;
+import org.apache.activemq.util.buffer.AsciiBuffer;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class MockBrokerTest extends TestCase {
+
+    protected static final int PERFORMANCE_SAMPLES = 30000000;
+    protected static final int SAMPLING_FREQUENCY = 5;
+
+    protected static final int FANIN_COUNT = 10;
+    protected static final int FANOUT_COUNT = 10;
+
+    protected static final int PRIORITY_LEVELS = 10;
+    protected static final boolean USE_INPUT_QUEUES = false;
+
+    // Set to put senders and consumers on separate brokers.
+    protected boolean multibroker = false;
+
+    // Set to mockup up ptp:
+    protected boolean ptp = false;
+
+    // Set to use tcp IO
+    protected boolean tcp = false;
+    // set to force marshalling even in the NON tcp case.
+    protected boolean forceMarshalling = false;
+
+    protected String sendBrokerURI;
+    protected String receiveBrokerURI;
+
+    // Set's the number of threads to use:
+    protected static final boolean SEPARATE_CLIENT_DISPATCHER = false;
+    protected final int threadsPerDispatcher = Runtime.getRuntime().availableProcessors();
+    protected boolean usePartitionedQueue = false;
+
+    protected ArrayList<MockBroker> brokers = new ArrayList<MockBroker>();
+    protected MockBroker sendBroker;
+    protected MockBroker rcvBroker;
+    protected MockClient client;
+
+    protected Dispatcher dispatcher;
+
+    static public final Mapper<Long, Message> KEY_MAPPER = new Mapper<Long, Message>() {
+        public Long map(Message element) {
+            return element.getMsgId();
+        }
+    };
+    static public final Mapper<Integer, Message> PARTITION_MAPPER = new Mapper<Integer, Message>() {
+        public Integer map(Message element) {
+            // we modulo 10 to have at most 10 partitions which the producers
+            // gets split across.
+            return (int) (element.getProducerId() % 10);
+        }
+    };
+
+    @Override
+    protected void setUp() throws Exception {
+        if (tcp) {
+            sendBrokerURI = "tcp://localhost:10000?wireFormat=proto";
+            receiveBrokerURI = "tcp://localhost:20000?wireFormat=proto";
+        } else {
+            if (forceMarshalling) {
+                sendBrokerURI = "pipe://SendBroker?wireFormat=proto&marshal=true";
+                receiveBrokerURI = "pipe://ReceiveBroker?wireFormat=proto&marshal=true";
+            } else {
+                sendBrokerURI = "pipe://SendBroker?wireFormat=proto";
+                receiveBrokerURI = "pipe://ReceiveBroker?wireFormat=proto";
+            }
+        }
+    }
+
+    protected Dispatcher createDispatcher(String name) {
+        return DispatcherConfig.create("test", threadsPerDispatcher);
+    }
+
+    public void test_1_1_0() throws Exception {
+
+        client = new MockClient();
+        client.setNumProducers(1);
+        client.setDestCount(1);
+        client.setNumConsumers(0);
+
+        createConnections(1);
+        runTestCase();
+    }
+
+    public void test_1_1_1() throws Exception {
+        client = new MockClient();
+        client.setNumProducers(1);
+        client.setDestCount(1);
+        client.setNumConsumers(1);
+
+        createConnections(1);
+        runTestCase();
+    }
+
+    public void test_10_10_10() throws Exception {
+        client = new MockClient();
+        client.setNumProducers(FANIN_COUNT);
+        client.setDestCount(FANIN_COUNT);
+        client.setNumConsumers(FANOUT_COUNT);
+
+        createConnections(FANIN_COUNT);
+        runTestCase();
+    }
+
+    public void test_10_1_10() throws Exception {
+        client = new MockClient();
+        client.setNumProducers(FANIN_COUNT);
+        client.setDestCount(1);
+        client.setNumConsumers(FANOUT_COUNT);
+
+        createConnections(1);
+        runTestCase();
+    }
+
+    public void test_10_1_1() throws Exception {
+        client = new MockClient();
+        client.setNumProducers(FANIN_COUNT);
+        client.setDestCount(1);
+        client.setNumConsumers(1);
+
+        createConnections(1);
+        runTestCase();
+    }
+
+    public void test_1_1_10() throws Exception {
+        client = new MockClient();
+        client.setNumProducers(1);
+        client.setDestCount(1);
+        client.setNumConsumers(FANOUT_COUNT);
+
+        createConnections(1);
+        runTestCase();
+    }
+
+    public void test_2_2_2() throws Exception {
+        client = new MockClient();
+        client.setNumProducers(2);
+        client.setDestCount(2);
+        client.setNumConsumers(2);
+
+        createConnections(2);
+        runTestCase();
+    }
+
+    /**
+     * Tests 2 producers sending to 1 destination with 2 consumres, but with
+     * consumers set to select only messages from each producer. 1 consumers is
+     * set to slow, the other producer should be able to send quickly.
+     * 
+     * @throws Exception
+     */
+    public void test_2_2_2_SlowConsumer() throws Exception {
+        client = new MockClient();
+        client.setNumProducers(2);
+        client.setDestCount(2);
+        client.setNumConsumers(2);
+
+        createConnections(2);
+        client.consumer(0).setThinkTime(50);
+        runTestCase();
+
+    }
+
+    public void test_2_2_2_Selector() throws Exception {
+        client = new MockClient();
+        client.setNumProducers(2);
+        client.setDestCount(2);
+        client.setNumConsumers(2);
+
+        createConnections(2);
+
+        // Add properties to match producers to their consumers
+        for (int i = 0; i < 2; i++) {
+            String property = "match" + i;
+            client.consumer(i).setSelector(property);
+            client.producer(i).setProperty(property);
+        }
+
+        runTestCase();
+    }
+
+    /**
+     * Test sending with 1 high priority sender. The high priority sender should
+     * have higher throughput than the other low priority senders.
+     * 
+     * @throws Exception
+     */
+    public void test_2_1_1_HighPriorityProducer() throws Exception {
+
+        client = new MockClient();
+        client.setNumProducers(2);
+        client.setNumConsumers(1);
+        client.setDestCount(1);
+
+        createConnections(1);
+        ProducerConnection producer = client.producer(0);
+        client.includeInRateReport(producer);
+        producer.setPriority(1);
+        producer.getRate().setName("High Priority Producer Rate");
+
+        client.consumer(0).setThinkTime(1);
+
+        runTestCase();
+    }
+
+    /**
+     * Test sending with 1 high priority sender. The high priority sender should
+     * have higher throughput than the other low priority senders.
+     * 
+     * @throws Exception
+     */
+    public void test_2_1_1_MixedHighPriorityProducer() throws Exception {
+        client = new MockClient();
+        client.setNumProducers(2);
+        client.setNumConsumers(1);
+        client.setDestCount(1);
+
+        createConnections(1);
+        ProducerConnection producer = client.producer(0);
+        producer.setPriority(1);
+        producer.setPriorityMod(3);
+        producer.getRate().setName("High Priority Producer Rate");
+
+        client.consumer(0).setThinkTime(1);
+        runTestCase();
+    }
+
+    private void createConnections(int destCount) throws Exception {
+
+        dispatcher = createDispatcher("BrokerDispatcher");
+        dispatcher.retain();
+
+        if (multibroker) {
+            sendBroker = createBroker("SendBroker", sendBrokerURI);
+            rcvBroker = createBroker("RcvBroker", receiveBrokerURI);
+            brokers.add(sendBroker);
+            brokers.add(rcvBroker);
+        } else {
+            sendBroker = rcvBroker = createBroker("Broker", sendBrokerURI);
+            brokers.add(sendBroker);
+        }
+
+        DestinationBuffer[] dests = new DestinationBuffer[destCount];
+
+        for (int i = 0; i < destCount; i++) {
+            DestinationBean bean = new DestinationBean();
+            bean.setName(new AsciiBuffer("dest" + (i + 1)));
+            bean.setPtp(ptp);
+            dests[i] = bean.freeze();
+            if (ptp) {
+                MockQueue queue = createQueue(sendBroker, dests[i]);
+                sendBroker.addQueue(queue);
+                if (multibroker) {
+                    queue = createQueue(rcvBroker, dests[i]);
+                    rcvBroker.addQueue(queue);
+                }
+            }
+        }
+
+        Dispatcher clientDispatcher = null;
+        if (SEPARATE_CLIENT_DISPATCHER) {
+            clientDispatcher = createDispatcher("ClientDispatcher");
+            clientDispatcher.retain();
+        } else {
+            clientDispatcher = dispatcher;
+        }
+
+        // Configure Client:
+        client.setDispatcher(clientDispatcher);
+        client.setNumPriorities(PRIORITY_LEVELS);
+        client.setSendBrokerURI(sendBroker.getUri());
+        client.setReceiveBrokerURI(rcvBroker.getUri());
+        client.setPerformanceSamples(PERFORMANCE_SAMPLES);
+        client.setSamplingFrequency(1000 * SAMPLING_FREQUENCY);
+        client.setThreadsPerDispatcher(threadsPerDispatcher);
+        client.setPtp(ptp);
+        client.setTestName(getName());
+
+        client.createConnections();
+    }
+
+    private MockQueue createQueue(MockBroker broker, Destination destination) {
+        MockQueue queue = new MockQueue();
+        queue.setBroker(broker);
+        queue.setDestination(destination);
+        queue.setKeyExtractor(KEY_MAPPER);
+        if (usePartitionedQueue) {
+            queue.setPartitionMapper(PARTITION_MAPPER);
+        }
+        return queue;
+    }
+
+    private MockBroker createBroker(String name, String uri) {
+        MockBroker broker = new MockBroker();
+        broker.setName(name);
+        broker.setUri(uri);
+        broker.setDispatcher(dispatcher);
+        broker.setUseInputQueues(USE_INPUT_QUEUES);
+        return broker;
+    }
+
+    private void runTestCase() throws Exception {
+        // Start 'em up.
+        startServices();
+        try {
+            client.runTest();
+        } finally {
+            stopServices();
+        }
+    }
+
+    private void stopServices() throws Exception {
+
+        for (MockBroker broker : brokers) {
+            broker.stopServices();
+        }
+
+        if (dispatcher != null) {
+            dispatcher.release();
+        }
+    }
+
+    private void startServices() throws Exception {
+
+        for (MockBroker broker : brokers) {
+            broker.startServices();
+        }
+    }
+
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockClient.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockClient.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockClient.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockClient.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.perf;
+
+import java.io.FileInputStream;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.dispatch.Dispatcher;
+import org.apache.activemq.dispatch.DispatcherConfig;
+import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.flow.Commands.Destination.DestinationBean;
+import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.MetricCounter;
+import org.apache.activemq.metric.Period;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.buffer.AsciiBuffer;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class MockClient {
+
+    protected int performanceSamples = 3;
+    protected int samplingFrequency = 5000;
+
+    protected int numProducers = 1;
+    protected int numConsumers = 1;
+    protected int destCount = 1;
+    protected int numPriorities = 1;
+
+    // Set to mockup up ptp:
+    protected boolean ptp = false;
+
+    protected String sendBrokerURI;
+    protected String receiveBrokerURI;
+
+    // Sets the number of threads to use:
+    protected int threadsPerDispatcher = Runtime.getRuntime().availableProcessors();
+
+    protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items");
+    protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items");
+    protected ArrayList<MetricCounter> additionalReportMetrics = new ArrayList<MetricCounter>();
+    protected boolean includeDetailedRates = false;
+
+    protected Dispatcher dispatcher;
+
+    public ConsumerConnection consumer(int index) {
+        return consumers.get(index);
+    }
+
+    public ProducerConnection producer(int index) {
+        return producers.get(index);
+    }
+
+    public int getThreadsPerDispatcher() {
+        return threadsPerDispatcher;
+    }
+
+    public void setThreadsPerDispatcher(int threadPoolSize) {
+        this.threadsPerDispatcher = threadPoolSize;
+    }
+    
+    public void setIncludeDetailedRates(boolean includeDetailedRates) {
+        this.includeDetailedRates = includeDetailedRates;
+    }
+
+    public boolean getIncludeDetailedRates() {
+        return includeDetailedRates;
+    }
+
+    public void includeInRateReport(ProducerConnection producer) {
+        additionalReportMetrics.add(producer.getRate());
+    }
+
+    public void includeInRateReport(ConsumerConnection consumer) {
+        additionalReportMetrics.add(consumer.getRate());
+    }
+    
+    public int getSamplingFrequency() {
+        return samplingFrequency;
+    }
+
+    public void setSamplingFrequency(int samplingFrequency) {
+        this.samplingFrequency = samplingFrequency;
+    }
+
+
+    public int getNumProducers() {
+        return numProducers;
+    }
+
+    public void setNumProducers(int numProducers) {
+        this.numProducers = numProducers;
+    }
+
+    public int getNumConsumers() {
+        return numConsumers;
+    }
+
+    public void setNumConsumers(int numConsumers) {
+        this.numConsumers = numConsumers;
+    }
+
+    public int getDestCount() {
+        return destCount;
+    }
+
+    public void setDestCount(int destCount) {
+        this.destCount = destCount;
+    }
+
+    public int getNumPriorities() {
+        return numPriorities;
+    }
+
+    public void setNumPriorities(int numPriorities) {
+        this.numPriorities = numPriorities;
+    }
+
+    public boolean isPtp() {
+        return ptp;
+    }
+
+    public void setPtp(boolean ptp) {
+        this.ptp = ptp;
+    }
+
+    public String getSendBrokerURI() {
+        return sendBrokerURI;
+    }
+
+    public void setSendBrokerURI(String sendBrokerURI) {
+        this.sendBrokerURI = sendBrokerURI;
+    }
+
+    public String getReceiveBrokerURI() {
+        return receiveBrokerURI;
+    }
+
+    public void setReceiveBrokerURI(String receiveBrokerURI) {
+        this.receiveBrokerURI = receiveBrokerURI;
+    }
+
+    public int getPerformanceSamples() {
+        return performanceSamples;
+    }
+
+    
+    protected final AtomicLong msgIdGenerator = new AtomicLong();
+
+    final ArrayList<ProducerConnection> producers = new ArrayList<ProducerConnection>();
+    final ArrayList<ConsumerConnection> consumers = new ArrayList<ConsumerConnection>();
+
+    private String testName;
+
+    private void createConsumer(int i, String connectUri, Destination destination) throws URISyntaxException {
+        ConsumerConnection consumer = new ConsumerConnection();
+        consumer.setDestination(destination);
+        consumer.setName("consumer" + (i + 1));
+        consumer.setTotalConsumerRate(totalConsumerRate);
+        consumer.setDispatcher(dispatcher);
+        consumer.setConnectUri(connectUri);
+        consumers.add(consumer);
+    }
+
+    private void createProducer(int id, String connectUri, Destination destination) throws URISyntaxException {
+        ProducerConnection producer = new ProducerConnection();
+        producer.setProducerId(id + 1);
+        producer.setName("producer" + (id + 1));
+        producer.setDestination(destination);
+        producer.setMessageIdGenerator(msgIdGenerator);
+        producer.setTotalProducerRate(totalProducerRate);
+        producer.setDispatcher(dispatcher);
+        producer.setConnectUri(connectUri);
+        producers.add(producer);
+    }
+
+    private void reportRates() throws InterruptedException {
+        System.out.println("Checking rates for test: " + getTestName() + ", " + (ptp ? "ptp" : "topic"));
+        for (int i = 0; i < performanceSamples; i++) {
+            Period p = new Period();
+            Thread.sleep(samplingFrequency);
+            System.out.println(totalProducerRate.getRateSummary(p));
+            System.out.println(totalConsumerRate.getRateSummary(p));
+            if (includeDetailedRates) {
+                System.out.println(totalProducerRate.getChildRateSummary(p));
+                System.out.println(totalConsumerRate.getChildRateSummary(p));
+            }
+            totalProducerRate.reset();
+            totalConsumerRate.reset();
+        }
+    }
+
+    public void setTestName(String testName) {
+        this.testName = testName;
+    }
+
+    public void setPerformanceSamples(int samples) {
+        this.performanceSamples = samples;
+    }
+
+    public String getTestName() {
+        return testName;
+    }
+
+    public void setDispatcher(Dispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+    public void runTest() throws Exception {
+        getDispatcher().retain();
+
+        // Start 'em up.
+        startServices();
+        try {
+            reportRates();
+        } finally {
+            stopServices();
+        }
+    }
+
+    private void startServices() throws Exception {
+//        BaseTestConnection.setInShutdown(false, dispatcher);
+        for (ConsumerConnection connection : consumers) {
+            connection.retain();
+        }
+
+        for (ProducerConnection connection : producers) {
+            connection.retain();
+        }
+    }
+
+    private void stopServices() throws Exception {
+//        BaseTestConnection.setInShutdown(true, dispatcher);
+        for (ProducerConnection connection : producers) {
+            connection.release();
+        }
+        for (ConsumerConnection connection : consumers) {
+            connection.release();
+        }
+    }
+
+    public void createConnections() throws Exception {
+
+        DestinationBuffer[] dests = new DestinationBuffer[destCount];
+
+        for (int i = 0; i < destCount; i++) {
+            DestinationBean bean = new DestinationBean();
+            bean.setName(new AsciiBuffer("dest" + (i + 1)));
+            bean.setPtp(ptp);
+            dests[i] = bean.freeze();
+        }
+
+        for (int i = 0; i < numProducers; i++) {
+            Destination destination = dests[i % destCount];
+            createProducer(i, sendBrokerURI, destination);
+        }
+
+        for (int i = 0; i < numConsumers; i++) {
+            Destination destination = dests[i % destCount];
+            createConsumer(i, receiveBrokerURI, destination);
+        }
+    }
+
+    public Dispatcher getDispatcher() {
+        return dispatcher;
+    }
+
+    protected Dispatcher createDispatcher() {
+        if (dispatcher == null) {
+            dispatcher = DispatcherConfig.create("client", threadsPerDispatcher);
+        }
+        return dispatcher;
+    }
+
+    /**
+     * Run the broker as a standalone app
+     * 
+     * @param args
+     *            The arguments.
+     * @throws Exception
+     */
+    public static void main(String[] args) throws Exception {
+        MockClient test = new MockClient();
+        test.createDispatcher();
+        
+        Properties props = new Properties();
+        if (args.length > 0) {
+            props.load(new FileInputStream(args[0]));
+            IntrospectionSupport.setProperties(test, props);
+        }
+        System.out.println(IntrospectionSupport.toString(test));
+        try
+        {
+            test.getDispatcher().retain();
+            test.createConnections();
+            test.runTest();
+        }
+        finally
+        {
+            test.getDispatcher().release();
+        }
+    }
+
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockQueue.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockQueue.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockQueue.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,202 @@
+/**
+ * 
+ */
+package org.apache.activemq.queue.actor.perf;
+
+import java.util.HashMap;
+
+import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.queue.IQueue;
+import org.apache.activemq.queue.PersistencePolicy;
+import org.apache.activemq.queue.Subscription;
+import org.apache.activemq.util.Mapper;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class MockQueue implements DeliveryTarget {
+
+    HashMap<DeliveryTarget, Subscription<Message>> subs = new HashMap<DeliveryTarget, Subscription<Message>>();
+    private Destination destination;
+    private IQueue<Long, Message> queue;
+    private MockBroker broker;
+
+    private Mapper<Integer, Message> partitionMapper;
+    private Mapper<Long, Message> keyExtractor;
+//    private final MockStoreAdapater store = new MockStoreAdapater();
+    private static final PersistencePolicy<Message> NO_PERSISTENCE = new PersistencePolicy.NON_PERSISTENT_POLICY<Message>();
+    
+    public void add(Message msg, Runnable r) {
+        throw new RuntimeException("please implement me.");
+    }
+    public boolean hasSelector() {
+        return false;
+    }
+    public boolean match(Message message) {
+        return true;
+    }
+
+//    private IQueue<Long, Message> createQueue() {
+//
+//        if (partitionMapper != null) {
+//            PartitionedQueue<Long, Message> queue = new PartitionedQueue<Long, Message>(destination.getName().toString()) {
+//                @Override
+//                public IQueue<Long, Message> createPartition(int partitionKey) {
+//                    return createSharedFlowQueue();
+//                }
+//            };
+//            queue.setPartitionMapper(partitionMapper);
+//            queue.setResourceName(destination.getName().toString());
+//            queue.setStore(store);
+//            queue.setPersistencePolicy(NO_PERSISTENCE);
+//            queue.initialize(0, 0, 0, 0);
+//            return queue;
+//        } else {
+//            return createSharedFlowQueue();
+//        }
+//    }
+//
+//    private IQueue<Long, Message> createSharedFlowQueue() {
+//        PrioritySizeLimiter<Message> limiter = new PrioritySizeLimiter<Message>(100, 1, MockBrokerTest.PRIORITY_LEVELS);
+//        limiter.setPriorityMapper(Message.PRIORITY_MAPPER);
+//        SharedPriorityQueue<Long, Message> queue = new SharedPriorityQueue<Long, Message>(destination.getName().toString(), limiter);
+//        queue.setKeyMapper(keyExtractor);
+//        queue.setAutoRelease(true);
+//        queue.setDispatcher(broker.getDispatcher());
+//        queue.setStore(store);
+//        queue.setPersistencePolicy(NO_PERSISTENCE);
+//        queue.initialize(0, 0, 0, 0);
+//        return queue;
+//    }
+//
+//    public final void deliver(ISourceController<Message> source, Message msg) {
+//        queue.add(msg, source);
+//    }
+//
+    public final Destination getDestination() {
+        return destination;
+    }
+
+    public final void addConsumer(final DeliveryTarget dt) {
+        throw new RuntimeException("please implement me.");
+//        Subscription<Message> sub = new Subscription<Message>() {
+//
+//            public boolean isBrowser() {
+//                return false;
+//            }
+//
+//            public boolean matches(Message message) {
+//                return dt.match(message);
+//            }
+//
+//            public boolean isRemoveOnDispatch(Message message) {
+//                return true;
+//            }
+//
+//            public boolean isExclusive() {
+//                return false;
+//            }
+//
+//            public IFlowSink<Message> getSink() {
+//                return dt.getSink();
+//            }
+//
+//            @Override
+//            public String toString() {
+//                return getSink().toString();
+//            }
+//
+//            public boolean hasSelector() {
+//                return dt.hasSelector();
+//            }
+//
+//            public boolean offer(Message elem, ISourceController<?> controller, SubscriptionDelivery<Message> ackCallback) {
+//                return getSink().offer(elem, controller);
+//            }
+//
+//            public void add(Message elem, ISourceController<?> controller, SubscriptionDelivery<Message> ackCallback) {
+//                getSink().add(elem, controller);
+//            }
+//        };
+//        subs.put(dt, sub);
+//        queue.addSubscription(sub);
+    }
+//
+//    public boolean removeSubscirption(final DeliveryTarget dt) {
+//        Subscription<Message> sub = subs.remove(dt);
+//        if (sub != null) {
+//            return queue.removeSubscription(sub);
+//        }
+//        return false;
+//    }
+//
+    public void start() throws Exception {
+//        queue = createQueue();
+//        queue.start();
+    }
+
+    public void stop() throws Exception {
+    }
+
+    public MockBroker getBroker() {
+        return broker;
+    }
+
+    public void setBroker(MockBroker broker) {
+        this.broker = broker;
+    }
+
+    public Mapper<Integer, Message> getPartitionMapper() {
+        return partitionMapper;
+    }
+
+    public void setPartitionMapper(Mapper<Integer, Message> partitionMapper) {
+        this.partitionMapper = partitionMapper;
+    }
+
+    public Mapper<Long, Message> getKeyExtractor() {
+        return keyExtractor;
+    }
+
+    public void setKeyExtractor(Mapper<Long, Message> keyExtractor) {
+        this.keyExtractor = keyExtractor;
+    }
+
+    public void setDestination(Destination destination) {
+        this.destination = destination;
+    }
+//
+//    static final class MockStoreAdapater implements QueueStore<Long, Message> {
+//
+//        MockStoreAdapater() {
+//
+//        }
+//
+//        public final void deleteQueueElement(SaveableQueueElement<Message> elem) {
+//
+//        }
+//
+//        public final boolean isFromStore(Message elem) {
+//            return false;
+//        }
+//
+//        public final void persistQueueElement(SaveableQueueElement<Message> elem, ISourceController<?> controller, boolean delayable) {
+//            // Noop;
+//        }
+//
+//        public final void restoreQueueElements(QueueDescriptor queue, boolean recordsOnly, long firstSequence, long maxSequence, int maxCount, RestoreListener<Message> listener) {
+//            throw new UnsupportedOperationException("Mock broker doesn't support persistence");
+//        }
+//
+//        public final void addQueue(QueueDescriptor queue) {
+//
+//        }
+//
+//        public final void deleteQueue(QueueDescriptor queue) {
+//
+//        }
+//
+//    }
+
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ProducerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ProducerConnection.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ProducerConnection.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ProducerConnection.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.perf;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.actor.ActorProxy;
+import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.MetricCounter;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class ProducerConnection extends ClientConnection {
+    
+    private int priority;
+    private int priorityMod;
+    private int producerId;
+    private Destination destination;
+    private String property;
+    private MetricAggregator totalProducerRate;
+    private int payloadSize = 0;
+    private final MetricCounter rate = new MetricCounter();
+    private AtomicLong messageIdGenerator;
+
+    protected void createActor() {
+        actor = ActorProxy.create(Protocol.class, new ProducerProtocolImpl(), dispatchQueue);
+    }
+
+    class ProducerProtocolImpl extends ClientProtocolImpl {
+        
+        private String filler;
+        private int payloadCounter;
+
+        @Override
+        public void start() {
+            rate.name("Producer " + name + " Rate");
+            totalProducerRate.add(rate);
+
+            if (payloadSize > 0) {
+                StringBuilder sb = new StringBuilder(payloadSize);
+                for (int i = 0; i < payloadSize; ++i) {
+                    sb.append((char) ('a' + (i % 26)));
+                }
+                filler = sb.toString();
+            }
+            super.start();
+        }
+        
+        @Override
+        public void onConnect() {
+            super.onConnect();
+            produceMessages();
+        }
+
+        protected void onSessionResume() {
+            produceMessages();
+        }
+        
+        private void produceMessages() {
+            while( !isSessionSendBlocked() ) {
+                int p = priority;
+                if (priorityMod > 0) {
+                    p = payloadCounter % priorityMod == 0 ? 0 : p;
+                }
+
+                Message next = new Message(messageIdGenerator.incrementAndGet(), producerId, createPayload(), null, destination, p);
+                if (property != null) {
+                    next.setProperty(property);
+                }
+                sessionSend(next);
+                rate.increment();
+            }
+        }
+        
+        private String createPayload() {
+            if (payloadSize >= 0) {
+                StringBuilder sb = new StringBuilder(payloadSize);
+                sb.append(name);
+                sb.append(':');
+                sb.append(++payloadCounter);
+                sb.append(':');
+                int length = sb.length();
+                if (length <= payloadSize) {
+                    sb.append(filler.subSequence(0, payloadSize - length));
+                    return sb.toString();
+                } else {
+                    return sb.substring(0, payloadSize);
+                }
+            } else {
+                return name + ":" + (++payloadCounter);
+            }
+        }
+        
+    }
+
+    public int getPriority() {
+        return priority;
+    }
+
+    public void setPriority(int priority) {
+        this.priority = priority;
+    }
+
+    public int getPriorityMod() {
+        return priorityMod;
+    }
+
+    public void setPriorityMod(int priorityMod) {
+        this.priorityMod = priorityMod;
+    }
+
+    public int getProducerId() {
+        return producerId;
+    }
+
+    public void setProducerId(int producerId) {
+        this.producerId = producerId;
+    }
+
+    public Destination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(Destination destination) {
+        this.destination = destination;
+    }
+
+    public String getProperty() {
+        return property;
+    }
+
+    public void setProperty(String property) {
+        this.property = property;
+    }
+
+    public MetricAggregator getTotalProducerRate() {
+        return totalProducerRate;
+    }
+
+    public void setTotalProducerRate(MetricAggregator totalProducerRate) {
+        this.totalProducerRate = totalProducerRate;
+    }
+
+    public int getPayloadSize() {
+        return payloadSize;
+    }
+
+    public void setPayloadSize(int payloadSize) {
+        this.payloadSize = payloadSize;
+    }
+
+    public MetricCounter getRate() {
+        return rate;
+    }
+
+    public AtomicLong getMessageIdGenerator() {
+        return messageIdGenerator;
+    }
+
+    public void setMessageIdGenerator(AtomicLong messageIdGenerator) {
+        this.messageIdGenerator = messageIdGenerator;
+    }
+    
+
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/Router.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/Router.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/Router.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,46 @@
+/**
+ * 
+ */
+package org.apache.activemq.queue.actor.perf;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.util.buffer.AsciiBuffer;
+
+import static org.apache.activemq.dispatch.internal.RunnableSupport.*;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class Router {
+    final HashMap<AsciiBuffer, Collection<DeliveryTarget>> lookupTable = new HashMap<AsciiBuffer, Collection<DeliveryTarget>>();
+
+    final synchronized void bind(DeliveryTarget dt, Destination destination) {
+        AsciiBuffer key = destination.getName();
+        Collection<DeliveryTarget> targets = lookupTable.get(key);
+        if (targets == null) {
+            targets = new ArrayList<DeliveryTarget>();
+            lookupTable.put(key, targets);
+        }
+        targets.add(dt);
+    }
+
+    final void route(Message msg, DispatchQueue queue, Runnable onRouteCompleted) {
+        AsciiBuffer key = msg.getDestination().getName();
+        Collection<DeliveryTarget> targets = lookupTable.get(key);
+        if( targets == null ) 
+            return;
+        
+        Runnable r = runOnceAfter(queue, onRouteCompleted, targets.size());
+        for (DeliveryTarget dt : targets) {
+            if ( dt.match(msg) ) {
+                dt.add(msg, r);
+            }
+        }
+    }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/Transport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/Transport.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/Transport.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/Transport.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.transport;
+
+import org.apache.activemq.dispatch.DispatchObject;
+import org.apache.activemq.dispatch.DispatchQueue;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface Transport extends DispatchObject {
+    
+    public void setHandler(TransportHandler hanlder);
+    
+    public void send(Object message);
+    public void send(Object message, Runnable onCompleted, DispatchQueue queue);
+    
+    String getRemoteAddress();
+
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportFactory.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportFactory.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportFactory.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.transport;
+
+import org.apache.activemq.dispatch.Dispatcher;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface TransportFactory {
+
+    Transport connect(Dispatcher dispatcher, String connectUri);
+
+    TransportServer bind(Dispatcher dispatcher, String bindUri);
+
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportFactorySystem.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportFactorySystem.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportFactorySystem.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportFactorySystem.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.transport;
+
+import org.apache.activemq.dispatch.Dispatcher;
+import org.apache.activemq.queue.actor.transport.pipe.PipeTransportFactory;
+
+
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class TransportFactorySystem {
+
+    public static Transport connect(Dispatcher dispatcher, String connectUri) {
+        return new PipeTransportFactory().connect(dispatcher, connectUri);
+    }
+
+    public static TransportServer bind(Dispatcher dispatcher, String bindUri) {
+        return new PipeTransportFactory().bind(dispatcher, bindUri);
+    }
+
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportHandler.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportHandler.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportHandler.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.queue.actor.transport;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface TransportHandler {
+    void onConnect();
+    void onRecevie(Object message);
+    void onFailure(Exception failure);
+    void onDisconnect();
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportServer.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportServer.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportServer.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.transport;
+
+import org.apache.activemq.dispatch.DispatchObject;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface TransportServer extends DispatchObject {
+
+    void setHandler(TransportServerHandler handler);
+    
+    String getConnectURI();
+
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportServerHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportServerHandler.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportServerHandler.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportServerHandler.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.transport;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface TransportServerHandler {
+
+    void onBind();
+    void onUnbind();
+
+    void onAccept(Transport transport);
+    void onFailure(Exception failure);
+
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/pipe/PipeTransportFactory.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/pipe/PipeTransportFactory.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/pipe/PipeTransportFactory.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,368 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.transport.pipe;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.actor.ActorProxy;
+import org.apache.activemq.dispatch.DispatchObject;
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.Dispatcher;
+import org.apache.activemq.queue.actor.transport.Transport;
+import org.apache.activemq.queue.actor.transport.TransportFactory;
+import org.apache.activemq.queue.actor.transport.TransportHandler;
+import org.apache.activemq.queue.actor.transport.TransportServer;
+import org.apache.activemq.queue.actor.transport.TransportServerHandler;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.URISupport;
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class PipeTransportFactory implements TransportFactory {
+
+    static protected final HashMap<String, PipeTransportServer> servers = new HashMap<String, PipeTransportServer>();
+    
+    static void perform_unbind(PipeTransportServer server) {
+        synchronized(servers) {
+            servers.remove(server.name);
+        }
+    }
+    
+    static void perform_bind(PipeTransportServer server) throws IOException {
+        synchronized(servers) {
+            if (servers.containsKey(server.name)) {
+                throw new IOException("Server already bound: " + server.name);
+            }
+            servers.put(server.name, server);
+        }
+    }
+
+    static void perform_connect(PipeTransport clientTransport) throws IOException {
+        PipeTransportServer server;
+        synchronized(servers) {
+            server = servers.get(clientTransport.name);
+            if( server == null ) {
+                throw new IOException("Server not bound: " + clientTransport.name);
+            }
+        }
+        PipeTransport serverTransport = new PipeTransport(server.dispatcher);
+        clientTransport.peer = serverTransport.actor;
+        serverTransport.peer = clientTransport.actor;
+        server.actor.onConnect(serverTransport);
+    }
+    
+    
+    public TransportServer bind(Dispatcher dispatcher, String bindUri) {
+        String name;
+        Map<String, String> options;
+        try {
+            URI uri = new URI(bindUri);
+            name = uri.getHost();
+            options = URISupport.parseParamters(uri);
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException("Invalid bind uri: "+e, e);
+        }
+
+        PipeTransportServer rc = new PipeTransportServer(dispatcher);
+        rc.connectURI = bindUri;
+        IntrospectionSupport.setProperties(rc, options);
+        if (!options.isEmpty()) {
+            throw new IllegalArgumentException("Invalid bind uri parameters: " + options);
+        }
+        rc.name = name;
+        return rc;
+    }
+
+    public Transport connect(Dispatcher dispatcher, String connectUri) {
+        
+        String name;
+        Map<String, String> options;
+        try {
+            URI uri = new URI(connectUri);
+            name = uri.getHost();
+            options = URISupport.parseParamters(uri);
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException("Invalid connect uri: "+e, e);
+        }
+        
+        PipeTransport rc = new PipeTransport(dispatcher);
+        IntrospectionSupport.setProperties(rc, options);
+        if (!options.isEmpty()) {
+            throw new IllegalArgumentException("Invalid connect uri parameters: " + options);
+        }
+        rc.connnectAddress = connectUri;
+        rc.name = name;
+        return rc;
+
+    }
+    
+    public static class DispatchObjectFilter implements DispatchObject {
+        protected DispatchObject next;
+        
+        public DispatchObjectFilter() {
+        }
+
+        public DispatchObjectFilter(DispatchObject next) {
+            this.next = next;
+        }
+        
+        public void addShutdownWatcher(Runnable shutdownWatcher) {
+            next.addShutdownWatcher(shutdownWatcher);
+        }
+        public <Context> Context getContext() {
+            return next.getContext();
+        }
+        public DispatchQueue getTargetQueue() {
+            return next.getTargetQueue();
+        }
+        public void release() {
+            next.release();
+        }
+        public void resume() {
+            next.resume();
+        }
+        public void retain() {
+            next.retain();
+        }
+        public <Context> void setContext(Context context) {
+            next.setContext(context);
+        }
+        public void setTargetQueue(DispatchQueue queue) {
+            next.setTargetQueue(queue);
+        }
+        public void suspend() {
+            next.suspend();
+        }
+    }
+    
+    interface PipeTransportServerActor {
+        public void onBind();
+        public void onConnect(PipeTransport serverSide);
+        public void onUnbind();
+    }
+
+    protected class PipeTransportServer extends DispatchObjectFilter implements TransportServer, PipeTransportServerActor {
+        
+        private final Dispatcher dispatcher;
+        private final DispatchQueue dispatchQueue;
+        private final PipeTransportServerActor actor;
+
+        protected String connectURI;
+        protected TransportServerHandler handler;
+        protected String name;
+        protected String wireFormat;
+        protected WireFormatFactory wireFormatFactory;
+        protected boolean marshal;
+        
+        protected final AtomicInteger suspendCounter = new AtomicInteger();
+        protected long connectionCounter;
+
+        public PipeTransportServer(Dispatcher dispatcher) {
+            super( dispatcher.createSerialQueue(null) );
+            this.dispatcher = dispatcher;
+            dispatchQueue = (DispatchQueue) next;
+            dispatchQueue.suspend();
+            this.actor = ActorProxy.create(PipeTransportServerActor.class, this, dispatchQueue);
+            this.actor.onBind();
+        }
+        
+        public void onBind() {
+            try {
+                perform_bind(this);
+                handler.onBind();
+            } catch (IOException e) {
+                handler.onFailure(e);
+            }
+        }
+
+        public void onUnbind() {
+            perform_unbind(this);
+            handler.onUnbind();
+        }
+        
+        public void setHandler(TransportServerHandler handler) {
+            this.handler = handler;
+        }
+
+        public void onConnect(PipeTransport serverSide) {
+            long connectionId = connectionCounter++;
+            serverSide.remoteAddress = connectURI.toString() + "#" + connectionId;
+            handler.onAccept(serverSide);
+        }
+
+        public String getConnectURI() {
+            return connectURI;
+        }
+
+        public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
+            this.wireFormatFactory = wireFormatFactory;
+        }
+
+        public boolean isMarshal() {
+            return marshal;
+        }
+
+        public void setMarshal(boolean marshal) {
+            this.marshal = marshal;
+        }
+
+        public String getWireFormat() {
+            return wireFormat;
+        }
+
+        public void setWireFormat(String wireFormat) {
+            this.wireFormat = wireFormat;
+        }
+
+    }
+
+    interface PipeTransportActor {
+        public void onConnect();
+        public void onDispatch(Object message, Runnable onCompleted, DispatchQueue queue);
+        public void onDisconnect();
+        public void onFailure(Exception e);
+    }
+
+    protected static class PipeTransport extends DispatchObjectFilter implements PipeTransportActor, Transport {
+
+        public String connnectAddress;
+        public String remoteAddress;
+        private PipeTransportActor actor;
+        private PipeTransportActor peer;
+        
+        private DispatchQueue dispatchQueue;
+        private TransportHandler handler;
+        private String name;
+        private WireFormat wf;
+        private String wireFormat;
+        private boolean marshal;
+        
+        protected final AtomicInteger suspendCounter = new AtomicInteger();
+        
+        public PipeTransport(Dispatcher dispatcher) {
+            super( dispatcher.createSerialQueue(null) );
+            this.dispatchQueue = (DispatchQueue) next;
+            this.dispatchQueue.suspend();
+            
+            // Queue up the connect event so it's the first thing that gets executed when
+            // this object gets resumed..
+            this.actor = ActorProxy.create(PipeTransportActor.class, this, dispatchQueue);
+            this.actor.onConnect();
+        }
+        
+        public void setHandler(TransportHandler hanlder) {
+            this.handler = hanlder;
+        }
+        
+        public void onConnect() {
+            try {
+                if (connnectAddress != null) {
+                    // Client side connect case...
+                    perform_connect(this);
+                    remoteAddress = connnectAddress;
+                    handler.onConnect();
+                } else {
+                    // Server side connect case...
+                    if( peer==null || remoteAddress==null ) {
+                        throw new IOException("Server transport not properly initialized.");
+                    }
+                    handler.onConnect();
+                }
+            } catch (IOException e) {
+                handler.onFailure(e);
+            }
+        }
+
+        public void send(Object message) {
+            send(message, null, null);
+        }
+
+        public void send(Object message, Runnable onCompleted, DispatchQueue queue) {
+            try {
+                if( peer==null ) {
+                    throw new IOException("not connected");
+                }
+                if (wf != null && marshal) {
+                    message = wf.marshal(message);
+                }
+            } catch (IOException e) {
+                actor.onFailure(e);
+                complete(onCompleted, queue);
+                return;
+            }
+            peer.onDispatch(message, onCompleted, queue);
+        }
+
+        public void onDispatch(Object message, Runnable onCompleted, DispatchQueue queue) {
+            try {
+                Object m = message;
+                if (wf != null && marshal) {
+                    try {
+                        m = wf.unmarshal((Buffer) m);
+                    } catch (IOException e) {
+                        handler.onFailure(e);
+                        return;
+                    }
+                }
+                handler.onRecevie(m);
+            } finally {
+                complete(onCompleted, queue);
+            }
+        }
+
+        private void complete(Runnable onCompleted, DispatchQueue queue) {
+            if( onCompleted!=null ) {
+                if(queue!=null) {
+                    queue.dispatchAsync(onCompleted);
+                } else {
+                    onCompleted.run();
+                }
+            }
+        }
+        
+        public void onDisconnect() {
+            handler.onDisconnect();
+        }
+        public void onFailure(Exception e) {
+            handler.onFailure(e);
+        }
+
+        public String getRemoteAddress() {
+            return remoteAddress;
+        }
+
+        public void setWireFormat(String wireFormat) {
+            this.wireFormat = wireFormat;
+        }
+
+        public String getWireFormat() {
+            return wireFormat;
+        }
+
+
+    }
+}