You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/12/17 21:21:06 UTC
svn commit: r891866 [2/2] - in /activemq/sandbox/activemq-apollo-actor:
activemq-dispatcher/src/main/java/org/apache/activemq/actor/
activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/
activemq-dispatcher/src/main/java/org/apache/activemq/d...
Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBrokerTest.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBrokerTest.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBrokerTest.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.perf;
+
+import java.util.ArrayList;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.dispatch.Dispatcher;
+import org.apache.activemq.dispatch.DispatcherConfig;
+import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.flow.Commands.Destination.DestinationBean;
+import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
+import org.apache.activemq.util.Mapper;
+import org.apache.activemq.util.buffer.AsciiBuffer;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class MockBrokerTest extends TestCase {
+
+ protected static final int PERFORMANCE_SAMPLES = 30000000;
+ protected static final int SAMPLING_FREQUENCY = 5;
+
+ protected static final int FANIN_COUNT = 10;
+ protected static final int FANOUT_COUNT = 10;
+
+ protected static final int PRIORITY_LEVELS = 10;
+ protected static final boolean USE_INPUT_QUEUES = false;
+
+ // Set to put senders and consumers on separate brokers.
+ protected boolean multibroker = false;
+
+ // Set to mockup up ptp:
+ protected boolean ptp = false;
+
+ // Set to use tcp IO
+ protected boolean tcp = false;
+ // set to force marshalling even in the NON tcp case.
+ protected boolean forceMarshalling = false;
+
+ protected String sendBrokerURI;
+ protected String receiveBrokerURI;
+
+ // Set's the number of threads to use:
+ protected static final boolean SEPARATE_CLIENT_DISPATCHER = false;
+ protected final int threadsPerDispatcher = Runtime.getRuntime().availableProcessors();
+ protected boolean usePartitionedQueue = false;
+
+ protected ArrayList<MockBroker> brokers = new ArrayList<MockBroker>();
+ protected MockBroker sendBroker;
+ protected MockBroker rcvBroker;
+ protected MockClient client;
+
+ protected Dispatcher dispatcher;
+
+ static public final Mapper<Long, Message> KEY_MAPPER = new Mapper<Long, Message>() {
+ public Long map(Message element) {
+ return element.getMsgId();
+ }
+ };
+ static public final Mapper<Integer, Message> PARTITION_MAPPER = new Mapper<Integer, Message>() {
+ public Integer map(Message element) {
+ // we modulo 10 to have at most 10 partitions which the producers
+ // gets split across.
+ return (int) (element.getProducerId() % 10);
+ }
+ };
+
+ @Override
+ protected void setUp() throws Exception {
+ if (tcp) {
+ sendBrokerURI = "tcp://localhost:10000?wireFormat=proto";
+ receiveBrokerURI = "tcp://localhost:20000?wireFormat=proto";
+ } else {
+ if (forceMarshalling) {
+ sendBrokerURI = "pipe://SendBroker?wireFormat=proto&marshal=true";
+ receiveBrokerURI = "pipe://ReceiveBroker?wireFormat=proto&marshal=true";
+ } else {
+ sendBrokerURI = "pipe://SendBroker?wireFormat=proto";
+ receiveBrokerURI = "pipe://ReceiveBroker?wireFormat=proto";
+ }
+ }
+ }
+
+ protected Dispatcher createDispatcher(String name) {
+ return DispatcherConfig.create("test", threadsPerDispatcher);
+ }
+
+ public void test_1_1_0() throws Exception {
+
+ client = new MockClient();
+ client.setNumProducers(1);
+ client.setDestCount(1);
+ client.setNumConsumers(0);
+
+ createConnections(1);
+ runTestCase();
+ }
+
+ public void test_1_1_1() throws Exception {
+ client = new MockClient();
+ client.setNumProducers(1);
+ client.setDestCount(1);
+ client.setNumConsumers(1);
+
+ createConnections(1);
+ runTestCase();
+ }
+
+ public void test_10_10_10() throws Exception {
+ client = new MockClient();
+ client.setNumProducers(FANIN_COUNT);
+ client.setDestCount(FANIN_COUNT);
+ client.setNumConsumers(FANOUT_COUNT);
+
+ createConnections(FANIN_COUNT);
+ runTestCase();
+ }
+
+ public void test_10_1_10() throws Exception {
+ client = new MockClient();
+ client.setNumProducers(FANIN_COUNT);
+ client.setDestCount(1);
+ client.setNumConsumers(FANOUT_COUNT);
+
+ createConnections(1);
+ runTestCase();
+ }
+
+ public void test_10_1_1() throws Exception {
+ client = new MockClient();
+ client.setNumProducers(FANIN_COUNT);
+ client.setDestCount(1);
+ client.setNumConsumers(1);
+
+ createConnections(1);
+ runTestCase();
+ }
+
+ public void test_1_1_10() throws Exception {
+ client = new MockClient();
+ client.setNumProducers(1);
+ client.setDestCount(1);
+ client.setNumConsumers(FANOUT_COUNT);
+
+ createConnections(1);
+ runTestCase();
+ }
+
+ public void test_2_2_2() throws Exception {
+ client = new MockClient();
+ client.setNumProducers(2);
+ client.setDestCount(2);
+ client.setNumConsumers(2);
+
+ createConnections(2);
+ runTestCase();
+ }
+
+ /**
+ * Tests 2 producers sending to 1 destination with 2 consumres, but with
+ * consumers set to select only messages from each producer. 1 consumers is
+ * set to slow, the other producer should be able to send quickly.
+ *
+ * @throws Exception
+ */
+ public void test_2_2_2_SlowConsumer() throws Exception {
+ client = new MockClient();
+ client.setNumProducers(2);
+ client.setDestCount(2);
+ client.setNumConsumers(2);
+
+ createConnections(2);
+ client.consumer(0).setThinkTime(50);
+ runTestCase();
+
+ }
+
+ public void test_2_2_2_Selector() throws Exception {
+ client = new MockClient();
+ client.setNumProducers(2);
+ client.setDestCount(2);
+ client.setNumConsumers(2);
+
+ createConnections(2);
+
+ // Add properties to match producers to their consumers
+ for (int i = 0; i < 2; i++) {
+ String property = "match" + i;
+ client.consumer(i).setSelector(property);
+ client.producer(i).setProperty(property);
+ }
+
+ runTestCase();
+ }
+
+ /**
+ * Test sending with 1 high priority sender. The high priority sender should
+ * have higher throughput than the other low priority senders.
+ *
+ * @throws Exception
+ */
+ public void test_2_1_1_HighPriorityProducer() throws Exception {
+
+ client = new MockClient();
+ client.setNumProducers(2);
+ client.setNumConsumers(1);
+ client.setDestCount(1);
+
+ createConnections(1);
+ ProducerConnection producer = client.producer(0);
+ client.includeInRateReport(producer);
+ producer.setPriority(1);
+ producer.getRate().setName("High Priority Producer Rate");
+
+ client.consumer(0).setThinkTime(1);
+
+ runTestCase();
+ }
+
+ /**
+ * Test sending with 1 high priority sender. The high priority sender should
+ * have higher throughput than the other low priority senders.
+ *
+ * @throws Exception
+ */
+ public void test_2_1_1_MixedHighPriorityProducer() throws Exception {
+ client = new MockClient();
+ client.setNumProducers(2);
+ client.setNumConsumers(1);
+ client.setDestCount(1);
+
+ createConnections(1);
+ ProducerConnection producer = client.producer(0);
+ producer.setPriority(1);
+ producer.setPriorityMod(3);
+ producer.getRate().setName("High Priority Producer Rate");
+
+ client.consumer(0).setThinkTime(1);
+ runTestCase();
+ }
+
+ private void createConnections(int destCount) throws Exception {
+
+ dispatcher = createDispatcher("BrokerDispatcher");
+ dispatcher.retain();
+
+ if (multibroker) {
+ sendBroker = createBroker("SendBroker", sendBrokerURI);
+ rcvBroker = createBroker("RcvBroker", receiveBrokerURI);
+ brokers.add(sendBroker);
+ brokers.add(rcvBroker);
+ } else {
+ sendBroker = rcvBroker = createBroker("Broker", sendBrokerURI);
+ brokers.add(sendBroker);
+ }
+
+ DestinationBuffer[] dests = new DestinationBuffer[destCount];
+
+ for (int i = 0; i < destCount; i++) {
+ DestinationBean bean = new DestinationBean();
+ bean.setName(new AsciiBuffer("dest" + (i + 1)));
+ bean.setPtp(ptp);
+ dests[i] = bean.freeze();
+ if (ptp) {
+ MockQueue queue = createQueue(sendBroker, dests[i]);
+ sendBroker.addQueue(queue);
+ if (multibroker) {
+ queue = createQueue(rcvBroker, dests[i]);
+ rcvBroker.addQueue(queue);
+ }
+ }
+ }
+
+ Dispatcher clientDispatcher = null;
+ if (SEPARATE_CLIENT_DISPATCHER) {
+ clientDispatcher = createDispatcher("ClientDispatcher");
+ clientDispatcher.retain();
+ } else {
+ clientDispatcher = dispatcher;
+ }
+
+ // Configure Client:
+ client.setDispatcher(clientDispatcher);
+ client.setNumPriorities(PRIORITY_LEVELS);
+ client.setSendBrokerURI(sendBroker.getUri());
+ client.setReceiveBrokerURI(rcvBroker.getUri());
+ client.setPerformanceSamples(PERFORMANCE_SAMPLES);
+ client.setSamplingFrequency(1000 * SAMPLING_FREQUENCY);
+ client.setThreadsPerDispatcher(threadsPerDispatcher);
+ client.setPtp(ptp);
+ client.setTestName(getName());
+
+ client.createConnections();
+ }
+
+ private MockQueue createQueue(MockBroker broker, Destination destination) {
+ MockQueue queue = new MockQueue();
+ queue.setBroker(broker);
+ queue.setDestination(destination);
+ queue.setKeyExtractor(KEY_MAPPER);
+ if (usePartitionedQueue) {
+ queue.setPartitionMapper(PARTITION_MAPPER);
+ }
+ return queue;
+ }
+
+ private MockBroker createBroker(String name, String uri) {
+ MockBroker broker = new MockBroker();
+ broker.setName(name);
+ broker.setUri(uri);
+ broker.setDispatcher(dispatcher);
+ broker.setUseInputQueues(USE_INPUT_QUEUES);
+ return broker;
+ }
+
+ private void runTestCase() throws Exception {
+ // Start 'em up.
+ startServices();
+ try {
+ client.runTest();
+ } finally {
+ stopServices();
+ }
+ }
+
+ private void stopServices() throws Exception {
+
+ for (MockBroker broker : brokers) {
+ broker.stopServices();
+ }
+
+ if (dispatcher != null) {
+ dispatcher.release();
+ }
+ }
+
+ private void startServices() throws Exception {
+
+ for (MockBroker broker : brokers) {
+ broker.startServices();
+ }
+ }
+
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockClient.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockClient.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockClient.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockClient.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.perf;
+
+import java.io.FileInputStream;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.dispatch.Dispatcher;
+import org.apache.activemq.dispatch.DispatcherConfig;
+import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.flow.Commands.Destination.DestinationBean;
+import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.MetricCounter;
+import org.apache.activemq.metric.Period;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.buffer.AsciiBuffer;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class MockClient {
+
+ protected int performanceSamples = 3;
+ protected int samplingFrequency = 5000;
+
+ protected int numProducers = 1;
+ protected int numConsumers = 1;
+ protected int destCount = 1;
+ protected int numPriorities = 1;
+
+ // Set to mockup up ptp:
+ protected boolean ptp = false;
+
+ protected String sendBrokerURI;
+ protected String receiveBrokerURI;
+
+ // Sets the number of threads to use:
+ protected int threadsPerDispatcher = Runtime.getRuntime().availableProcessors();
+
+ protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items");
+ protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items");
+ protected ArrayList<MetricCounter> additionalReportMetrics = new ArrayList<MetricCounter>();
+ protected boolean includeDetailedRates = false;
+
+ protected Dispatcher dispatcher;
+
+ public ConsumerConnection consumer(int index) {
+ return consumers.get(index);
+ }
+
+ public ProducerConnection producer(int index) {
+ return producers.get(index);
+ }
+
+ public int getThreadsPerDispatcher() {
+ return threadsPerDispatcher;
+ }
+
+ public void setThreadsPerDispatcher(int threadPoolSize) {
+ this.threadsPerDispatcher = threadPoolSize;
+ }
+
+ public void setIncludeDetailedRates(boolean includeDetailedRates) {
+ this.includeDetailedRates = includeDetailedRates;
+ }
+
+ public boolean getIncludeDetailedRates() {
+ return includeDetailedRates;
+ }
+
+ public void includeInRateReport(ProducerConnection producer) {
+ additionalReportMetrics.add(producer.getRate());
+ }
+
+ public void includeInRateReport(ConsumerConnection consumer) {
+ additionalReportMetrics.add(consumer.getRate());
+ }
+
+ public int getSamplingFrequency() {
+ return samplingFrequency;
+ }
+
+ public void setSamplingFrequency(int samplingFrequency) {
+ this.samplingFrequency = samplingFrequency;
+ }
+
+
+ public int getNumProducers() {
+ return numProducers;
+ }
+
+ public void setNumProducers(int numProducers) {
+ this.numProducers = numProducers;
+ }
+
+ public int getNumConsumers() {
+ return numConsumers;
+ }
+
+ public void setNumConsumers(int numConsumers) {
+ this.numConsumers = numConsumers;
+ }
+
+ public int getDestCount() {
+ return destCount;
+ }
+
+ public void setDestCount(int destCount) {
+ this.destCount = destCount;
+ }
+
+ public int getNumPriorities() {
+ return numPriorities;
+ }
+
+ public void setNumPriorities(int numPriorities) {
+ this.numPriorities = numPriorities;
+ }
+
+ public boolean isPtp() {
+ return ptp;
+ }
+
+ public void setPtp(boolean ptp) {
+ this.ptp = ptp;
+ }
+
+ public String getSendBrokerURI() {
+ return sendBrokerURI;
+ }
+
+ public void setSendBrokerURI(String sendBrokerURI) {
+ this.sendBrokerURI = sendBrokerURI;
+ }
+
+ public String getReceiveBrokerURI() {
+ return receiveBrokerURI;
+ }
+
+ public void setReceiveBrokerURI(String receiveBrokerURI) {
+ this.receiveBrokerURI = receiveBrokerURI;
+ }
+
+ public int getPerformanceSamples() {
+ return performanceSamples;
+ }
+
+
+ protected final AtomicLong msgIdGenerator = new AtomicLong();
+
+ final ArrayList<ProducerConnection> producers = new ArrayList<ProducerConnection>();
+ final ArrayList<ConsumerConnection> consumers = new ArrayList<ConsumerConnection>();
+
+ private String testName;
+
+ private void createConsumer(int i, String connectUri, Destination destination) throws URISyntaxException {
+ ConsumerConnection consumer = new ConsumerConnection();
+ consumer.setDestination(destination);
+ consumer.setName("consumer" + (i + 1));
+ consumer.setTotalConsumerRate(totalConsumerRate);
+ consumer.setDispatcher(dispatcher);
+ consumer.setConnectUri(connectUri);
+ consumers.add(consumer);
+ }
+
+ private void createProducer(int id, String connectUri, Destination destination) throws URISyntaxException {
+ ProducerConnection producer = new ProducerConnection();
+ producer.setProducerId(id + 1);
+ producer.setName("producer" + (id + 1));
+ producer.setDestination(destination);
+ producer.setMessageIdGenerator(msgIdGenerator);
+ producer.setTotalProducerRate(totalProducerRate);
+ producer.setDispatcher(dispatcher);
+ producer.setConnectUri(connectUri);
+ producers.add(producer);
+ }
+
+ private void reportRates() throws InterruptedException {
+ System.out.println("Checking rates for test: " + getTestName() + ", " + (ptp ? "ptp" : "topic"));
+ for (int i = 0; i < performanceSamples; i++) {
+ Period p = new Period();
+ Thread.sleep(samplingFrequency);
+ System.out.println(totalProducerRate.getRateSummary(p));
+ System.out.println(totalConsumerRate.getRateSummary(p));
+ if (includeDetailedRates) {
+ System.out.println(totalProducerRate.getChildRateSummary(p));
+ System.out.println(totalConsumerRate.getChildRateSummary(p));
+ }
+ totalProducerRate.reset();
+ totalConsumerRate.reset();
+ }
+ }
+
+ public void setTestName(String testName) {
+ this.testName = testName;
+ }
+
+ public void setPerformanceSamples(int samples) {
+ this.performanceSamples = samples;
+ }
+
+ public String getTestName() {
+ return testName;
+ }
+
+ public void setDispatcher(Dispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ public void runTest() throws Exception {
+ getDispatcher().retain();
+
+ // Start 'em up.
+ startServices();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ private void startServices() throws Exception {
+// BaseTestConnection.setInShutdown(false, dispatcher);
+ for (ConsumerConnection connection : consumers) {
+ connection.retain();
+ }
+
+ for (ProducerConnection connection : producers) {
+ connection.retain();
+ }
+ }
+
+ private void stopServices() throws Exception {
+// BaseTestConnection.setInShutdown(true, dispatcher);
+ for (ProducerConnection connection : producers) {
+ connection.release();
+ }
+ for (ConsumerConnection connection : consumers) {
+ connection.release();
+ }
+ }
+
+ public void createConnections() throws Exception {
+
+ DestinationBuffer[] dests = new DestinationBuffer[destCount];
+
+ for (int i = 0; i < destCount; i++) {
+ DestinationBean bean = new DestinationBean();
+ bean.setName(new AsciiBuffer("dest" + (i + 1)));
+ bean.setPtp(ptp);
+ dests[i] = bean.freeze();
+ }
+
+ for (int i = 0; i < numProducers; i++) {
+ Destination destination = dests[i % destCount];
+ createProducer(i, sendBrokerURI, destination);
+ }
+
+ for (int i = 0; i < numConsumers; i++) {
+ Destination destination = dests[i % destCount];
+ createConsumer(i, receiveBrokerURI, destination);
+ }
+ }
+
+ public Dispatcher getDispatcher() {
+ return dispatcher;
+ }
+
+ protected Dispatcher createDispatcher() {
+ if (dispatcher == null) {
+ dispatcher = DispatcherConfig.create("client", threadsPerDispatcher);
+ }
+ return dispatcher;
+ }
+
+ /**
+ * Run the broker as a standalone app
+ *
+ * @param args
+ * The arguments.
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ MockClient test = new MockClient();
+ test.createDispatcher();
+
+ Properties props = new Properties();
+ if (args.length > 0) {
+ props.load(new FileInputStream(args[0]));
+ IntrospectionSupport.setProperties(test, props);
+ }
+ System.out.println(IntrospectionSupport.toString(test));
+ try
+ {
+ test.getDispatcher().retain();
+ test.createConnections();
+ test.runTest();
+ }
+ finally
+ {
+ test.getDispatcher().release();
+ }
+ }
+
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockQueue.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockQueue.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockQueue.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,202 @@
+/**
+ *
+ */
+package org.apache.activemq.queue.actor.perf;
+
+import java.util.HashMap;
+
+import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.queue.IQueue;
+import org.apache.activemq.queue.PersistencePolicy;
+import org.apache.activemq.queue.Subscription;
+import org.apache.activemq.util.Mapper;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class MockQueue implements DeliveryTarget {
+
+ HashMap<DeliveryTarget, Subscription<Message>> subs = new HashMap<DeliveryTarget, Subscription<Message>>();
+ private Destination destination;
+ private IQueue<Long, Message> queue;
+ private MockBroker broker;
+
+ private Mapper<Integer, Message> partitionMapper;
+ private Mapper<Long, Message> keyExtractor;
+// private final MockStoreAdapater store = new MockStoreAdapater();
+ private static final PersistencePolicy<Message> NO_PERSISTENCE = new PersistencePolicy.NON_PERSISTENT_POLICY<Message>();
+
+ public void add(Message msg, Runnable r) {
+ throw new RuntimeException("please implement me.");
+ }
+ public boolean hasSelector() {
+ return false;
+ }
+ public boolean match(Message message) {
+ return true;
+ }
+
+// private IQueue<Long, Message> createQueue() {
+//
+// if (partitionMapper != null) {
+// PartitionedQueue<Long, Message> queue = new PartitionedQueue<Long, Message>(destination.getName().toString()) {
+// @Override
+// public IQueue<Long, Message> createPartition(int partitionKey) {
+// return createSharedFlowQueue();
+// }
+// };
+// queue.setPartitionMapper(partitionMapper);
+// queue.setResourceName(destination.getName().toString());
+// queue.setStore(store);
+// queue.setPersistencePolicy(NO_PERSISTENCE);
+// queue.initialize(0, 0, 0, 0);
+// return queue;
+// } else {
+// return createSharedFlowQueue();
+// }
+// }
+//
+// private IQueue<Long, Message> createSharedFlowQueue() {
+// PrioritySizeLimiter<Message> limiter = new PrioritySizeLimiter<Message>(100, 1, MockBrokerTest.PRIORITY_LEVELS);
+// limiter.setPriorityMapper(Message.PRIORITY_MAPPER);
+// SharedPriorityQueue<Long, Message> queue = new SharedPriorityQueue<Long, Message>(destination.getName().toString(), limiter);
+// queue.setKeyMapper(keyExtractor);
+// queue.setAutoRelease(true);
+// queue.setDispatcher(broker.getDispatcher());
+// queue.setStore(store);
+// queue.setPersistencePolicy(NO_PERSISTENCE);
+// queue.initialize(0, 0, 0, 0);
+// return queue;
+// }
+//
+// public final void deliver(ISourceController<Message> source, Message msg) {
+// queue.add(msg, source);
+// }
+//
+ public final Destination getDestination() {
+ return destination;
+ }
+
+ public final void addConsumer(final DeliveryTarget dt) {
+ throw new RuntimeException("please implement me.");
+// Subscription<Message> sub = new Subscription<Message>() {
+//
+// public boolean isBrowser() {
+// return false;
+// }
+//
+// public boolean matches(Message message) {
+// return dt.match(message);
+// }
+//
+// public boolean isRemoveOnDispatch(Message message) {
+// return true;
+// }
+//
+// public boolean isExclusive() {
+// return false;
+// }
+//
+// public IFlowSink<Message> getSink() {
+// return dt.getSink();
+// }
+//
+// @Override
+// public String toString() {
+// return getSink().toString();
+// }
+//
+// public boolean hasSelector() {
+// return dt.hasSelector();
+// }
+//
+// public boolean offer(Message elem, ISourceController<?> controller, SubscriptionDelivery<Message> ackCallback) {
+// return getSink().offer(elem, controller);
+// }
+//
+// public void add(Message elem, ISourceController<?> controller, SubscriptionDelivery<Message> ackCallback) {
+// getSink().add(elem, controller);
+// }
+// };
+// subs.put(dt, sub);
+// queue.addSubscription(sub);
+ }
+//
+// public boolean removeSubscirption(final DeliveryTarget dt) {
+// Subscription<Message> sub = subs.remove(dt);
+// if (sub != null) {
+// return queue.removeSubscription(sub);
+// }
+// return false;
+// }
+//
+ public void start() throws Exception {
+// queue = createQueue();
+// queue.start();
+ }
+
+ public void stop() throws Exception {
+ }
+
+ public MockBroker getBroker() {
+ return broker;
+ }
+
+ public void setBroker(MockBroker broker) {
+ this.broker = broker;
+ }
+
+ public Mapper<Integer, Message> getPartitionMapper() {
+ return partitionMapper;
+ }
+
+ public void setPartitionMapper(Mapper<Integer, Message> partitionMapper) {
+ this.partitionMapper = partitionMapper;
+ }
+
+ public Mapper<Long, Message> getKeyExtractor() {
+ return keyExtractor;
+ }
+
+ public void setKeyExtractor(Mapper<Long, Message> keyExtractor) {
+ this.keyExtractor = keyExtractor;
+ }
+
+ public void setDestination(Destination destination) {
+ this.destination = destination;
+ }
+//
+// static final class MockStoreAdapater implements QueueStore<Long, Message> {
+//
+// MockStoreAdapater() {
+//
+// }
+//
+// public final void deleteQueueElement(SaveableQueueElement<Message> elem) {
+//
+// }
+//
+// public final boolean isFromStore(Message elem) {
+// return false;
+// }
+//
+// public final void persistQueueElement(SaveableQueueElement<Message> elem, ISourceController<?> controller, boolean delayable) {
+// // Noop;
+// }
+//
+// public final void restoreQueueElements(QueueDescriptor queue, boolean recordsOnly, long firstSequence, long maxSequence, int maxCount, RestoreListener<Message> listener) {
+// throw new UnsupportedOperationException("Mock broker doesn't support persistence");
+// }
+//
+// public final void addQueue(QueueDescriptor queue) {
+//
+// }
+//
+// public final void deleteQueue(QueueDescriptor queue) {
+//
+// }
+//
+// }
+
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ProducerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ProducerConnection.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ProducerConnection.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ProducerConnection.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.perf;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.actor.ActorProxy;
+import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.MetricCounter;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class ProducerConnection extends ClientConnection {
+
+ private int priority;
+ private int priorityMod;
+ private int producerId;
+ private Destination destination;
+ private String property;
+ private MetricAggregator totalProducerRate;
+ private int payloadSize = 0;
+ private final MetricCounter rate = new MetricCounter();
+ private AtomicLong messageIdGenerator;
+
+ protected void createActor() {
+ actor = ActorProxy.create(Protocol.class, new ProducerProtocolImpl(), dispatchQueue);
+ }
+
+ class ProducerProtocolImpl extends ClientProtocolImpl {
+
+ private String filler;
+ private int payloadCounter;
+
+ @Override
+ public void start() {
+ rate.name("Producer " + name + " Rate");
+ totalProducerRate.add(rate);
+
+ if (payloadSize > 0) {
+ StringBuilder sb = new StringBuilder(payloadSize);
+ for (int i = 0; i < payloadSize; ++i) {
+ sb.append((char) ('a' + (i % 26)));
+ }
+ filler = sb.toString();
+ }
+ super.start();
+ }
+
+ @Override
+ public void onConnect() {
+ super.onConnect();
+ produceMessages();
+ }
+
+ protected void onSessionResume() {
+ produceMessages();
+ }
+
+ private void produceMessages() {
+ while( !isSessionSendBlocked() ) {
+ int p = priority;
+ if (priorityMod > 0) {
+ p = payloadCounter % priorityMod == 0 ? 0 : p;
+ }
+
+ Message next = new Message(messageIdGenerator.incrementAndGet(), producerId, createPayload(), null, destination, p);
+ if (property != null) {
+ next.setProperty(property);
+ }
+ sessionSend(next);
+ rate.increment();
+ }
+ }
+
+ private String createPayload() {
+ if (payloadSize >= 0) {
+ StringBuilder sb = new StringBuilder(payloadSize);
+ sb.append(name);
+ sb.append(':');
+ sb.append(++payloadCounter);
+ sb.append(':');
+ int length = sb.length();
+ if (length <= payloadSize) {
+ sb.append(filler.subSequence(0, payloadSize - length));
+ return sb.toString();
+ } else {
+ return sb.substring(0, payloadSize);
+ }
+ } else {
+ return name + ":" + (++payloadCounter);
+ }
+ }
+
+ }
+
+ public int getPriority() {
+ return priority;
+ }
+
+ public void setPriority(int priority) {
+ this.priority = priority;
+ }
+
+ public int getPriorityMod() {
+ return priorityMod;
+ }
+
+ public void setPriorityMod(int priorityMod) {
+ this.priorityMod = priorityMod;
+ }
+
+ public int getProducerId() {
+ return producerId;
+ }
+
+ public void setProducerId(int producerId) {
+ this.producerId = producerId;
+ }
+
+ public Destination getDestination() {
+ return destination;
+ }
+
+ public void setDestination(Destination destination) {
+ this.destination = destination;
+ }
+
+ public String getProperty() {
+ return property;
+ }
+
+ public void setProperty(String property) {
+ this.property = property;
+ }
+
+ public MetricAggregator getTotalProducerRate() {
+ return totalProducerRate;
+ }
+
+ public void setTotalProducerRate(MetricAggregator totalProducerRate) {
+ this.totalProducerRate = totalProducerRate;
+ }
+
+ public int getPayloadSize() {
+ return payloadSize;
+ }
+
+ public void setPayloadSize(int payloadSize) {
+ this.payloadSize = payloadSize;
+ }
+
+ public MetricCounter getRate() {
+ return rate;
+ }
+
+ public AtomicLong getMessageIdGenerator() {
+ return messageIdGenerator;
+ }
+
+ public void setMessageIdGenerator(AtomicLong messageIdGenerator) {
+ this.messageIdGenerator = messageIdGenerator;
+ }
+
+
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/Router.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/Router.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/Router.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,46 @@
+/**
+ *
+ */
+package org.apache.activemq.queue.actor.perf;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.util.buffer.AsciiBuffer;
+
+import static org.apache.activemq.dispatch.internal.RunnableSupport.*;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class Router {
+ final HashMap<AsciiBuffer, Collection<DeliveryTarget>> lookupTable = new HashMap<AsciiBuffer, Collection<DeliveryTarget>>();
+
+ final synchronized void bind(DeliveryTarget dt, Destination destination) {
+ AsciiBuffer key = destination.getName();
+ Collection<DeliveryTarget> targets = lookupTable.get(key);
+ if (targets == null) {
+ targets = new ArrayList<DeliveryTarget>();
+ lookupTable.put(key, targets);
+ }
+ targets.add(dt);
+ }
+
+ final void route(Message msg, DispatchQueue queue, Runnable onRouteCompleted) {
+ AsciiBuffer key = msg.getDestination().getName();
+ Collection<DeliveryTarget> targets = lookupTable.get(key);
+ if( targets == null )
+ return;
+
+ Runnable r = runOnceAfter(queue, onRouteCompleted, targets.size());
+ for (DeliveryTarget dt : targets) {
+ if ( dt.match(msg) ) {
+ dt.add(msg, r);
+ }
+ }
+ }
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/Transport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/Transport.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/Transport.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/Transport.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.transport;
+
+import org.apache.activemq.dispatch.DispatchObject;
+import org.apache.activemq.dispatch.DispatchQueue;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface Transport extends DispatchObject {
+
+ public void setHandler(TransportHandler hanlder);
+
+ public void send(Object message);
+ public void send(Object message, Runnable onCompleted, DispatchQueue queue);
+
+ String getRemoteAddress();
+
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportFactory.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportFactory.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportFactory.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.transport;
+
+import org.apache.activemq.dispatch.Dispatcher;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface TransportFactory {
+
+ Transport connect(Dispatcher dispatcher, String connectUri);
+
+ TransportServer bind(Dispatcher dispatcher, String bindUri);
+
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportFactorySystem.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportFactorySystem.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportFactorySystem.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportFactorySystem.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.transport;
+
+import org.apache.activemq.dispatch.Dispatcher;
+import org.apache.activemq.queue.actor.transport.pipe.PipeTransportFactory;
+
+
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class TransportFactorySystem {
+
+ public static Transport connect(Dispatcher dispatcher, String connectUri) {
+ return new PipeTransportFactory().connect(dispatcher, connectUri);
+ }
+
+ public static TransportServer bind(Dispatcher dispatcher, String bindUri) {
+ return new PipeTransportFactory().bind(dispatcher, bindUri);
+ }
+
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportHandler.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportHandler.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportHandler.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.queue.actor.transport;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface TransportHandler {
+ void onConnect();
+ void onRecevie(Object message);
+ void onFailure(Exception failure);
+ void onDisconnect();
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportServer.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportServer.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportServer.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.transport;
+
+import org.apache.activemq.dispatch.DispatchObject;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface TransportServer extends DispatchObject {
+
+ void setHandler(TransportServerHandler handler);
+
+ String getConnectURI();
+
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportServerHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportServerHandler.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportServerHandler.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportServerHandler.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.transport;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface TransportServerHandler {
+
+ void onBind();
+ void onUnbind();
+
+ void onAccept(Transport transport);
+ void onFailure(Exception failure);
+
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/pipe/PipeTransportFactory.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/pipe/PipeTransportFactory.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/pipe/PipeTransportFactory.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,368 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.transport.pipe;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.actor.ActorProxy;
+import org.apache.activemq.dispatch.DispatchObject;
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.Dispatcher;
+import org.apache.activemq.queue.actor.transport.Transport;
+import org.apache.activemq.queue.actor.transport.TransportFactory;
+import org.apache.activemq.queue.actor.transport.TransportHandler;
+import org.apache.activemq.queue.actor.transport.TransportServer;
+import org.apache.activemq.queue.actor.transport.TransportServerHandler;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.URISupport;
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class PipeTransportFactory implements TransportFactory {
+
+ static protected final HashMap<String, PipeTransportServer> servers = new HashMap<String, PipeTransportServer>();
+
+ static void perform_unbind(PipeTransportServer server) {
+ synchronized(servers) {
+ servers.remove(server.name);
+ }
+ }
+
+ static void perform_bind(PipeTransportServer server) throws IOException {
+ synchronized(servers) {
+ if (servers.containsKey(server.name)) {
+ throw new IOException("Server already bound: " + server.name);
+ }
+ servers.put(server.name, server);
+ }
+ }
+
+ static void perform_connect(PipeTransport clientTransport) throws IOException {
+ PipeTransportServer server;
+ synchronized(servers) {
+ server = servers.get(clientTransport.name);
+ if( server == null ) {
+ throw new IOException("Server not bound: " + clientTransport.name);
+ }
+ }
+ PipeTransport serverTransport = new PipeTransport(server.dispatcher);
+ clientTransport.peer = serverTransport.actor;
+ serverTransport.peer = clientTransport.actor;
+ server.actor.onConnect(serverTransport);
+ }
+
+
+ public TransportServer bind(Dispatcher dispatcher, String bindUri) {
+ String name;
+ Map<String, String> options;
+ try {
+ URI uri = new URI(bindUri);
+ name = uri.getHost();
+ options = URISupport.parseParamters(uri);
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException("Invalid bind uri: "+e, e);
+ }
+
+ PipeTransportServer rc = new PipeTransportServer(dispatcher);
+ rc.connectURI = bindUri;
+ IntrospectionSupport.setProperties(rc, options);
+ if (!options.isEmpty()) {
+ throw new IllegalArgumentException("Invalid bind uri parameters: " + options);
+ }
+ rc.name = name;
+ return rc;
+ }
+
+ public Transport connect(Dispatcher dispatcher, String connectUri) {
+
+ String name;
+ Map<String, String> options;
+ try {
+ URI uri = new URI(connectUri);
+ name = uri.getHost();
+ options = URISupport.parseParamters(uri);
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException("Invalid connect uri: "+e, e);
+ }
+
+ PipeTransport rc = new PipeTransport(dispatcher);
+ IntrospectionSupport.setProperties(rc, options);
+ if (!options.isEmpty()) {
+ throw new IllegalArgumentException("Invalid connect uri parameters: " + options);
+ }
+ rc.connnectAddress = connectUri;
+ rc.name = name;
+ return rc;
+
+ }
+
+ public static class DispatchObjectFilter implements DispatchObject {
+ protected DispatchObject next;
+
+ public DispatchObjectFilter() {
+ }
+
+ public DispatchObjectFilter(DispatchObject next) {
+ this.next = next;
+ }
+
+ public void addShutdownWatcher(Runnable shutdownWatcher) {
+ next.addShutdownWatcher(shutdownWatcher);
+ }
+ public <Context> Context getContext() {
+ return next.getContext();
+ }
+ public DispatchQueue getTargetQueue() {
+ return next.getTargetQueue();
+ }
+ public void release() {
+ next.release();
+ }
+ public void resume() {
+ next.resume();
+ }
+ public void retain() {
+ next.retain();
+ }
+ public <Context> void setContext(Context context) {
+ next.setContext(context);
+ }
+ public void setTargetQueue(DispatchQueue queue) {
+ next.setTargetQueue(queue);
+ }
+ public void suspend() {
+ next.suspend();
+ }
+ }
+
+ interface PipeTransportServerActor {
+ public void onBind();
+ public void onConnect(PipeTransport serverSide);
+ public void onUnbind();
+ }
+
+ protected class PipeTransportServer extends DispatchObjectFilter implements TransportServer, PipeTransportServerActor {
+
+ private final Dispatcher dispatcher;
+ private final DispatchQueue dispatchQueue;
+ private final PipeTransportServerActor actor;
+
+ protected String connectURI;
+ protected TransportServerHandler handler;
+ protected String name;
+ protected String wireFormat;
+ protected WireFormatFactory wireFormatFactory;
+ protected boolean marshal;
+
+ protected final AtomicInteger suspendCounter = new AtomicInteger();
+ protected long connectionCounter;
+
+ public PipeTransportServer(Dispatcher dispatcher) {
+ super( dispatcher.createSerialQueue(null) );
+ this.dispatcher = dispatcher;
+ dispatchQueue = (DispatchQueue) next;
+ dispatchQueue.suspend();
+ this.actor = ActorProxy.create(PipeTransportServerActor.class, this, dispatchQueue);
+ this.actor.onBind();
+ }
+
+ public void onBind() {
+ try {
+ perform_bind(this);
+ handler.onBind();
+ } catch (IOException e) {
+ handler.onFailure(e);
+ }
+ }
+
+ public void onUnbind() {
+ perform_unbind(this);
+ handler.onUnbind();
+ }
+
+ public void setHandler(TransportServerHandler handler) {
+ this.handler = handler;
+ }
+
+ public void onConnect(PipeTransport serverSide) {
+ long connectionId = connectionCounter++;
+ serverSide.remoteAddress = connectURI.toString() + "#" + connectionId;
+ handler.onAccept(serverSide);
+ }
+
+ public String getConnectURI() {
+ return connectURI;
+ }
+
+ public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
+ this.wireFormatFactory = wireFormatFactory;
+ }
+
+ public boolean isMarshal() {
+ return marshal;
+ }
+
+ public void setMarshal(boolean marshal) {
+ this.marshal = marshal;
+ }
+
+ public String getWireFormat() {
+ return wireFormat;
+ }
+
+ public void setWireFormat(String wireFormat) {
+ this.wireFormat = wireFormat;
+ }
+
+ }
+
+ interface PipeTransportActor {
+ public void onConnect();
+ public void onDispatch(Object message, Runnable onCompleted, DispatchQueue queue);
+ public void onDisconnect();
+ public void onFailure(Exception e);
+ }
+
+ protected static class PipeTransport extends DispatchObjectFilter implements PipeTransportActor, Transport {
+
+ public String connnectAddress;
+ public String remoteAddress;
+ private PipeTransportActor actor;
+ private PipeTransportActor peer;
+
+ private DispatchQueue dispatchQueue;
+ private TransportHandler handler;
+ private String name;
+ private WireFormat wf;
+ private String wireFormat;
+ private boolean marshal;
+
+ protected final AtomicInteger suspendCounter = new AtomicInteger();
+
+ public PipeTransport(Dispatcher dispatcher) {
+ super( dispatcher.createSerialQueue(null) );
+ this.dispatchQueue = (DispatchQueue) next;
+ this.dispatchQueue.suspend();
+
+ // Queue up the connect event so it's the first thing that gets executed when
+ // this object gets resumed..
+ this.actor = ActorProxy.create(PipeTransportActor.class, this, dispatchQueue);
+ this.actor.onConnect();
+ }
+
+ public void setHandler(TransportHandler hanlder) {
+ this.handler = hanlder;
+ }
+
+ public void onConnect() {
+ try {
+ if (connnectAddress != null) {
+ // Client side connect case...
+ perform_connect(this);
+ remoteAddress = connnectAddress;
+ handler.onConnect();
+ } else {
+ // Server side connect case...
+ if( peer==null || remoteAddress==null ) {
+ throw new IOException("Server transport not properly initialized.");
+ }
+ handler.onConnect();
+ }
+ } catch (IOException e) {
+ handler.onFailure(e);
+ }
+ }
+
+ public void send(Object message) {
+ send(message, null, null);
+ }
+
+ public void send(Object message, Runnable onCompleted, DispatchQueue queue) {
+ try {
+ if( peer==null ) {
+ throw new IOException("not connected");
+ }
+ if (wf != null && marshal) {
+ message = wf.marshal(message);
+ }
+ } catch (IOException e) {
+ actor.onFailure(e);
+ complete(onCompleted, queue);
+ return;
+ }
+ peer.onDispatch(message, onCompleted, queue);
+ }
+
+ public void onDispatch(Object message, Runnable onCompleted, DispatchQueue queue) {
+ try {
+ Object m = message;
+ if (wf != null && marshal) {
+ try {
+ m = wf.unmarshal((Buffer) m);
+ } catch (IOException e) {
+ handler.onFailure(e);
+ return;
+ }
+ }
+ handler.onRecevie(m);
+ } finally {
+ complete(onCompleted, queue);
+ }
+ }
+
+ private void complete(Runnable onCompleted, DispatchQueue queue) {
+ if( onCompleted!=null ) {
+ if(queue!=null) {
+ queue.dispatchAsync(onCompleted);
+ } else {
+ onCompleted.run();
+ }
+ }
+ }
+
+ public void onDisconnect() {
+ handler.onDisconnect();
+ }
+ public void onFailure(Exception e) {
+ handler.onFailure(e);
+ }
+
+ public String getRemoteAddress() {
+ return remoteAddress;
+ }
+
+ public void setWireFormat(String wireFormat) {
+ this.wireFormat = wireFormat;
+ }
+
+ public String getWireFormat() {
+ return wireFormat;
+ }
+
+
+ }
+}