You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/02/12 20:05:06 UTC
svn commit: r743841 - in /activemq/sandbox/activemq-flow/src:
main/java/org/apache/activemq/flow/ test/java/org/apache/activemq/flow/
Author: chirino
Date: Thu Feb 12 19:05:05 2009
New Revision: 743841
URL: http://svn.apache.org/viewvc?rev=743841&view=rev
Log:
test case is now working against a real IO impl.
Added:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java
Removed:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/LocalConsumer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/LocalProducer.java
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/TestFlowManager.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java?rev=743841&r1=743840&r2=743841&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java Thu Feb 12 19:05:05 2009
@@ -91,7 +91,7 @@
this.flow = flow;
this.limiter = limiter == null ? new SizeLimiter<E>(0, 0) : limiter;
this.mutex = mutex;
- this.name = controllable.getFlowSource().toString();
+ this.name = controllable.toString();
}
public final IFlowLimiter<E> getLimiter() {
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java?rev=743841&r1=743840&r2=743841&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java Thu Feb 12 19:05:05 2009
@@ -61,22 +61,20 @@
public static final int BLOCKING = 0;
public static final int POLLING = 1;
public static final int ASYNC = 2;
- private final int dispatchMode;
AbstractTestConnection(MockBroker broker, String name, Flow flow, Pipe<Message> p) {
this.name = name;
this.broker = broker;
this.flow = flow;
- this.dispatchMode = broker.dispatchMode;
// Set up an input source:
this.input = new NetworkSource(flow, name + "-INPUT", inputWindowSize, inputResumeThreshold);
// Setup output queue:
- if (broker.priorityLevels <= 1) {
- this.output = broker.getFlowManager().createFlowQueue(flow, name + "-OUTPUT", outputQueueSize, resumeThreshold);
+ if (MockBrokerTest.PRIORITY_LEVELS <= 1) {
+ this.output = TestFlowManager.createFlowQueue(flow, name + "-OUTPUT", outputQueueSize, resumeThreshold);
} else {
- ExclusivePriorityQueue<Message> t = new ExclusivePriorityQueue<Message>(broker.priorityLevels, flow, name + "-OUTPUT", outputQueueSize, resumeThreshold);
+ ExclusivePriorityQueue<Message> t = new ExclusivePriorityQueue<Message>(MockBrokerTest.PRIORITY_LEVELS, flow, name + "-OUTPUT", outputQueueSize, resumeThreshold);
t.setPriorityMapper(Message.PRIORITY_MAPPER);
this.output = t;
}
@@ -130,8 +128,8 @@
}
public final void simulateEncodingWork() {
- if (broker.ioWorkAmount > 1) {
- fib(broker.ioWorkAmount);
+ if (MockBrokerTest.IO_WORK_AMOUNT > 1) {
+ fib(MockBrokerTest.IO_WORK_AMOUNT);
}
}
@@ -187,7 +185,7 @@
public final void start() throws Exception {
running.set(true);
- if (dispatchMode == BLOCKING) {
+ if (MockBrokerTest.DISPATCH_MODE == BLOCKING) {
listener = new Thread(new Runnable() {
public void run() {
try {
@@ -212,8 +210,8 @@
}, name + "-Sender");
sender.start();
} else {
- output.setDispatcher(broker.dispatcher);
- input.setDispatcher(broker.dispatcher);
+ output.setDispatcher(broker.getDispatcher());
+ input.setDispatcher(broker.getDispatcher());
return;
}
@@ -221,7 +219,7 @@
public final void stop() throws Exception {
running.set(false);
- if (dispatchMode == BLOCKING) {
+ if (MockBrokerTest.DISPATCH_MODE == BLOCKING) {
listener.interrupt();
listener.join();
sender.interrupt();
@@ -262,16 +260,16 @@
public NetworkSource(Flow flow, String name, int capacity, int resumeThreshold) {
super(name);
if (flow == null) {
- if (broker.useInputQueues) {
- inputQueue = broker.getFlowManager().createFlowQueue(flow, name, capacity, resumeThreshold);
+ if (MockBrokerTest.USE_INPUT_QUEUES) {
+ inputQueue = TestFlowManager.createFlowQueue(flow, name, capacity, resumeThreshold);
} else {
inputQueue = null;
}
flowController = null;
} else {
- if (broker.useInputQueues) {
- if (broker.priorityLevels <= 1) {
- inputQueue = broker.getFlowManager().createFlowQueue(flow, name, capacity, resumeThreshold);
+ if (MockBrokerTest.USE_INPUT_QUEUES) {
+ if (MockBrokerTest.PRIORITY_LEVELS <= 1) {
+ inputQueue = TestFlowManager.createFlowQueue(flow, name, capacity, resumeThreshold);
} else {
SingleFlowPriorityQueue<Message> t = new SingleFlowPriorityQueue<Message>(flow, name, new SizeLimiter<Message>(capacity, resumeThreshold));
t.setPriorityMapper(Message.PRIORITY_MAPPER);
@@ -332,7 +330,7 @@
// priority. Note that flow control from the input queue will limit
// dispatch
// of lower priority messages:
- if (broker.useInputQueues) {
+ if (MockBrokerTest.USE_INPUT_QUEUES) {
dispatchContext.updatePriority(Message.MAX_PRIORITY);
}
dispatchContext.requestDispatch();
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java?rev=743841&r1=743840&r2=743841&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java Thu Feb 12 19:05:05 2009
@@ -19,11 +19,12 @@
import java.io.Serializable;
import java.util.HashSet;
-import org.apache.activemq.flow.Flow;
import org.apache.activemq.queue.Mapper;
public class Message implements Serializable {
+ private static final long serialVersionUID = 6759761889075451996L;
+
public static final Mapper<Integer, Message> PRIORITY_MAPPER = new Mapper<Integer, Message>() {
public Integer map(Message element) {
return element.priority;
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java?rev=743841&r1=743840&r2=743841&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java Thu Feb 12 19:05:05 2009
@@ -3,16 +3,14 @@
*/
package org.apache.activemq.flow;
-import java.io.IOException;
import java.net.URI;
-import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.dispatch.IDispatcher;
import org.apache.activemq.flow.MockBrokerTest.BrokerConnection;
import org.apache.activemq.flow.MockBrokerTest.DeliveryTarget;
-import org.apache.activemq.flow.MockBrokerTest.Router;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener;
import org.apache.activemq.transport.TransportFactory;
@@ -20,55 +18,24 @@
class MockBroker implements TransportAcceptListener {
- private final MockBrokerTest mockBrokerTest;
- private final TestFlowManager flowMgr;
+ final Router router= new Router();
final ArrayList<RemoteConnection> connections = new ArrayList<RemoteConnection>();
- final ArrayList<LocalProducer> producers = new ArrayList<LocalProducer>();
- final ArrayList<LocalConsumer> consumers = new ArrayList<LocalConsumer>();
- private final ArrayList<BrokerConnection> brokerConns = new ArrayList<BrokerConnection>();
-
- private final HashMap<Destination, MockQueue> queues = new HashMap<Destination, MockQueue>();
- final Router router;
- private int pCount;
- private int cCount;
- private final String name;
- public final int dispatchMode;
-
- public final IDispatcher dispatcher;
- public final int priorityLevels = MockBrokerTest.PRIORITY_LEVELS;
- public final int ioWorkAmount = MockBrokerTest.IO_WORK_AMOUNT;
- public final boolean useInputQueues = MockBrokerTest.USE_INPUT_QUEUES;
- public TransportServer transportServer;
-
- MockBroker(MockBrokerTest mockBrokerTest, String name) throws IOException, URISyntaxException {
- this.mockBrokerTest = mockBrokerTest;
- this.flowMgr = new TestFlowManager();
- this.router = this.mockBrokerTest.new Router();
- this.name = name;
- this.dispatchMode = this.mockBrokerTest.dispatchMode;
- this.dispatcher = this.mockBrokerTest.dispatcher;
- }
-
- TestFlowManager getFlowManager() {
- return flowMgr;
- }
+ final ArrayList<RemoteProducer> producers = new ArrayList<RemoteProducer>();
+ final ArrayList<RemoteConsumer> consumers = new ArrayList<RemoteConsumer>();
+ final ArrayList<BrokerConnection> brokerConnections = new ArrayList<BrokerConnection>();
+ final HashMap<Destination, MockQueue> queues = new HashMap<Destination, MockQueue>();
+
+ private TransportServer transportServer;
+ private String uri;
+ private String name;
+ private IDispatcher dispatcher;
+ private final AtomicBoolean stopping = new AtomicBoolean();
public String getName() {
return name;
}
- public void createProducerConnection(Destination destination) {
- LocalProducer c = new LocalProducer(this.mockBrokerTest, "producer" + ++pCount, this, destination);
- producers.add(c);
- }
-
- public void createConsumerConnection(Destination destination) {
- LocalConsumer c = new LocalConsumer(this.mockBrokerTest, "consumer" + ++cCount, this, destination);
- consumers.add(c);
- subscribe(destination, c);
- }
-
public void subscribe(Destination destination, DeliveryTarget deliveryTarget) {
if (destination.ptp) {
queues.get(destination).addConsumer(deliveryTarget);
@@ -77,43 +44,44 @@
}
}
- public void createClusterConnection(Destination destination) {
- LocalConsumer c = new LocalConsumer(this.mockBrokerTest, "consumer" + ++cCount, this, destination);
- consumers.add(c);
- router.bind(c, destination);
- }
-
- public void createQueue(Destination destination) {
- MockQueue queue = new MockQueue(this.mockBrokerTest, this, destination);
- queues.put(destination, queue);
- }
- public void createBrokerConnection(MockBroker target, Pipe<Message> pipe) {
- BrokerConnection bc = this.mockBrokerTest.new BrokerConnection(this, target, pipe);
- // Set up the pipe for polled access
- if (dispatchMode != AbstractTestConnection.BLOCKING) {
- pipe.setMode(Pipe.POLLING);
- }
- // Add subscriptions for the target's destinations:
- for (Destination d : target.router.lookupTable.keySet()) {
- router.bind(bc, d);
- }
- brokerConns.add(bc);
- }
+ public void addQueue(MockQueue queue) {
+ router.bind(queue, queue.getDestination());
+ queues.put(queue.getDestination(), queue);
+ }
+
+// public void createClusterConnection(Destination destination) {
+// RemoteConsumer c = new RemoteConsumer(this.mockBrokerTest, "consumer" + ++consumerCounter, this, destination);
+// consumers.add(c);
+// router.bind(c, destination);
+// }
+// public void createBrokerConnection(MockBroker target, Pipe<Message> pipe) {
+// BrokerConnection bc = this.mockBrokerTest.new BrokerConnection(this, target, pipe);
+// // Set up the pipe for polled access
+// if (dispatchMode != AbstractTestConnection.BLOCKING) {
+// pipe.setMode(Pipe.POLLING);
+// }
+// // Add subscriptions for the target's destinations:
+// for (Destination d : target.router.lookupTable.keySet()) {
+// router.bind(bc, d);
+// }
+// brokerConns.add(bc);
+// }
final void stopServices() throws Exception {
+ stopping.set(true);
transportServer.stop();
- for (RemoteConnection connection : connections) {
+ for (RemoteProducer connection : producers) {
connection.stop();
}
- for (LocalProducer connection : producers) {
+ for (RemoteConsumer connection : consumers) {
connection.stop();
}
- for (LocalConsumer connection : consumers) {
+ for (RemoteConnection connection : connections) {
connection.stop();
}
- for (BrokerConnection connection : brokerConns) {
+ for (BrokerConnection connection : brokerConnections) {
connection.stop();
}
for (MockQueue queue : queues.values()) {
@@ -125,25 +93,25 @@
final void startServices() throws Exception {
- transportServer = TransportFactory.bind(new URI("tcp://localhost:61616?wireFormat=test"));
+ transportServer = TransportFactory.bind(new URI(uri));
transportServer.setAcceptListener(this);
transportServer.start();
dispatcher.start();
- for (LocalConsumer connection : consumers) {
- connection.start();
- }
-
for (MockQueue queue : queues.values()) {
queue.start();
}
+
+ for (RemoteConsumer connection : consumers) {
+ connection.start();
+ }
- for (LocalProducer connection : producers) {
+ for (RemoteProducer connection : producers) {
connection.start();
}
- for (BrokerConnection connection : brokerConns) {
+ for (BrokerConnection connection : brokerConnections) {
connection.start();
}
}
@@ -152,6 +120,8 @@
RemoteConnection connection = new RemoteConnection();
connection.setBroker(this);
connection.setTransport(transport);
+ connection.setPriorityLevels(MockBrokerTest.PRIORITY_LEVELS);
+ connection.setDispatcher(dispatcher);
try {
connection.start();
} catch (Exception e1) {
@@ -160,6 +130,36 @@
}
public void onAcceptError(Exception error) {
+ System.out.println("Accept error: "+error);
error.printStackTrace();
}
+
+ public IDispatcher getDispatcher() {
+ return dispatcher;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public void setDispatcher(IDispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ public String getUri() {
+ return uri;
+ }
+
+ public void setUri(String uri) {
+ this.uri = uri;
+ }
+
+ public URI getConnectURI() {
+ return transportServer.getConnectURI();
+ }
+
+ public boolean isStopping() {
+ return stopping.get();
+ }
+
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java?rev=743841&r1=743840&r2=743841&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java Thu Feb 12 19:05:05 2009
@@ -19,18 +19,12 @@
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import junit.framework.TestCase;
import org.apache.activemq.dispatch.IDispatcher;
import org.apache.activemq.dispatch.PriorityPooledDispatcher;
-import org.apache.activemq.flow.FlowController;
-import org.apache.activemq.flow.IFlowSink;
-import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.metric.MetricAggregator;
import org.apache.activemq.metric.Period;
import org.apache.activemq.queue.Mapper;
@@ -50,10 +44,10 @@
private boolean multibroker = false;
// Set to mockup up ptp:
- boolean ptp = true;
+ boolean ptp = false;
// Can be set to BLOCKING, POLLING or ASYNC
- final int dispatchMode = AbstractTestConnection.ASYNC;
+ public final static int DISPATCH_MODE = AbstractTestConnection.ASYNC;
// Set's the number of threads to use:
private final int asyncThreadPoolSize = Runtime.getRuntime().availableProcessors();
boolean usePartitionedQueue = false;
@@ -77,7 +71,6 @@
}
final AtomicLong msgIdGenerator = new AtomicLong();
- final AtomicInteger prodcuerIdGenerator = new AtomicInteger();
class BrokerConnection extends AbstractTestConnection implements DeliveryTarget {
private final Pipe<Message> pipe;
@@ -148,28 +141,6 @@
}
};
- class Router {
- final HashMap<Destination, Collection<DeliveryTarget>> lookupTable = new HashMap<Destination, Collection<DeliveryTarget>>();
-
- final synchronized void bind(DeliveryTarget dt, Destination destination) {
- Collection<DeliveryTarget> targets = lookupTable.get(destination);
- if (targets == null) {
- targets = new ArrayList<DeliveryTarget>();
- lookupTable.put(destination, targets);
- }
- targets.add(dt);
- }
-
- final void route(ISourceController<Message> source, Message msg) {
- Collection<DeliveryTarget> targets = lookupTable.get(msg.getDestination());
- for (DeliveryTarget dt : targets) {
- if (dt.match(msg)) {
- dt.getSink().add(msg, source);
- }
- }
- }
- }
-
private void reportRates() throws InterruptedException {
System.out.println("Checking rates for test: " + getName());
for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
@@ -195,9 +166,9 @@
consumerCount = 1;
createConnections();
- LocalProducer producer = sendBroker.producers.get(0);
- producer.msgPriority = 1;
- producer.producerRate.setName("High Priority Producer Rate");
+ RemoteProducer producer = sendBroker.producers.get(0);
+ producer.setPriority(1);
+ producer.getRate().setName("High Priority Producer Rate");
rcvBroker.consumers.get(0).setThinkTime(1);
@@ -209,7 +180,7 @@
for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
Period p = new Period();
Thread.sleep(1000 * 5);
- System.out.println(producer.producerRate.getRateSummary(p));
+ System.out.println(producer.getRate().getRateSummary(p));
System.out.println(totalProducerRate.getRateSummary(p));
System.out.println(totalConsumerRate.getRateSummary(p));
totalProducerRate.reset();
@@ -233,10 +204,10 @@
consumerCount = 1;
createConnections();
- LocalProducer producer = sendBroker.producers.get(0);
- producer.msgPriority = 1;
- producer.priorityMod = 3;
- producer.producerRate.setName("High Priority Producer Rate");
+ RemoteProducer producer = sendBroker.producers.get(0);
+ producer.setPriority(1);
+ producer.setPriorityMod(3);
+ producer.getRate().setName("High Priority Producer Rate");
rcvBroker.consumers.get(0).setThinkTime(1);
@@ -248,7 +219,7 @@
for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
Period p = new Period();
Thread.sleep(1000 * 5);
- System.out.println(producer.producerRate.getRateSummary(p));
+ System.out.println(producer.getRate().getRateSummary(p));
System.out.println(totalProducerRate.getRateSummary(p));
System.out.println(totalConsumerRate.getRateSummary(p));
totalProducerRate.reset();
@@ -389,7 +360,9 @@
// Add properties to match producers to their consumers
for (int i = 0; i < consumerCount; i++) {
- rcvBroker.consumers.get(i).selector = sendBroker.producers.get(i).property = "match" + i;
+ String property = "match" + i;
+ rcvBroker.consumers.get(i).setSelector(property);
+ sendBroker.producers.get(i).setProperty(property);
}
// Start 'em up.
@@ -400,21 +373,21 @@
stopServices();
}
}
-
+
private void createConnections() throws IOException, URISyntaxException {
- if (dispatchMode == AbstractTestConnection.ASYNC || dispatchMode == AbstractTestConnection.POLLING) {
+ if (DISPATCH_MODE == AbstractTestConnection.ASYNC || DISPATCH_MODE == AbstractTestConnection.POLLING) {
dispatcher = new PriorityPooledDispatcher("BrokerDispatcher", asyncThreadPoolSize, Message.MAX_PRIORITY);
FlowController.setFlowExecutor(dispatcher.createPriorityExecutor(Message.MAX_PRIORITY));
}
if (multibroker) {
- sendBroker = new MockBroker(this, "SendBroker");
- rcvBroker = new MockBroker(this, "RcvBroker");
+ sendBroker = createBroker("SendBroker", "tcp://localhost:10000?wireFormat=test");
brokers.add(sendBroker);
+ rcvBroker = createBroker("RcvBroker", "tcp://localhost:20000?wireFormat=test");
brokers.add(rcvBroker);
} else {
- sendBroker = rcvBroker = new MockBroker(this, "Broker");
+ sendBroker = rcvBroker = createBroker("Broker", "tcp://localhost:10000?wireFormat=test");
brokers.add(sendBroker);
}
@@ -423,26 +396,72 @@
for (int i = 0; i < destCount; i++) {
dests[i] = new Destination("dest" + (i + 1), ptp);
if (ptp) {
- sendBroker.createQueue(dests[i]);
+ MockQueue queue = createQueue(sendBroker, dests[i]);
+ sendBroker.addQueue(queue);
if (multibroker) {
- rcvBroker.createQueue(dests[i]);
+ queue = createQueue(rcvBroker, dests[i]);
+ rcvBroker.addQueue(queue);
}
}
}
for (int i = 0; i < producerCount; i++) {
- sendBroker.createProducerConnection(dests[i % destCount]);
+ Destination destination = dests[i % destCount];
+ RemoteProducer producer = createProducer(i, destination);
+ sendBroker.producers.add(producer);
}
+
for (int i = 0; i < consumerCount; i++) {
- rcvBroker.createConsumerConnection(dests[i % destCount]);
+ Destination destination = dests[i % destCount];
+ RemoteConsumer consumer = createConsumer(i, destination);
+ sendBroker.consumers.add(consumer);
}
// Create MultiBroker connections:
- if (multibroker) {
- Pipe<Message> pipe = new Pipe<Message>();
- sendBroker.createBrokerConnection(rcvBroker, pipe);
- rcvBroker.createBrokerConnection(sendBroker, pipe.connect());
- }
+// if (multibroker) {
+// Pipe<Message> pipe = new Pipe<Message>();
+// sendBroker.createBrokerConnection(rcvBroker, pipe);
+// rcvBroker.createBrokerConnection(sendBroker, pipe.connect());
+// }
+ }
+
+ private RemoteConsumer createConsumer(int i, Destination destination) {
+ RemoteConsumer consumer = new RemoteConsumer();
+ consumer.setBroker(rcvBroker);
+ consumer.setDestination(destination);
+ consumer.setName("consumer"+(i+1));
+ consumer.setTotalConsumerRate(totalConsumerRate);
+ return consumer;
+ }
+
+ private RemoteProducer createProducer(int id, Destination destination) {
+ RemoteProducer producer = new RemoteProducer();
+ producer.setBroker(sendBroker);
+ producer.setProducerId(id+1);
+ producer.setName("producer" +(id+1));
+ producer.setDestination(destination);
+ producer.setMessageIdGenerator(msgIdGenerator);
+ producer.setTotalProducerRate(totalProducerRate);
+ return producer;
+ }
+
+ private MockQueue createQueue(MockBroker broker, Destination destination) {
+ MockQueue queue = new MockQueue();
+ queue.setBroker(broker);
+ queue.setDestination(destination);
+ queue.setKeyExtractor(keyExtractor);
+ if( usePartitionedQueue ) {
+ queue.setPartitionMapper(partitionMapper);
+ }
+ return queue;
+ }
+
+ private MockBroker createBroker(String name, String uri) {
+ MockBroker broker = new MockBroker();
+ broker.setName(name);
+ broker.setUri(uri);
+ broker.setDispatcher(dispatcher);
+ return broker;
}
private void stopServices() throws Exception {
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java?rev=743841&r1=743840&r2=743841&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java Thu Feb 12 19:05:05 2009
@@ -7,6 +7,7 @@
import org.apache.activemq.flow.MockBrokerTest.DeliveryTarget;
import org.apache.activemq.queue.IQueue;
+import org.apache.activemq.queue.Mapper;
import org.apache.activemq.queue.PartitionedQueue;
import org.apache.activemq.queue.SharedPriorityQueue;
import org.apache.activemq.queue.SharedQueue;
@@ -14,30 +15,24 @@
class MockQueue implements MockBrokerTest.DeliveryTarget {
- private final MockBrokerTest mockBrokerTest;
HashMap<DeliveryTarget, Subscription<Message>> subs = new HashMap<DeliveryTarget, Subscription<Message>>();
- private final Destination destination;
- private final IQueue<Long, Message> queue;
- private final MockBroker broker;
-
- MockQueue(MockBrokerTest mockBrokerTest, MockBroker broker, Destination destination) {
- this.mockBrokerTest = mockBrokerTest;
- this.broker = broker;
- this.destination = destination;
- this.queue = createQueue();
- broker.router.bind(this, destination);
- }
+ private Destination destination;
+ private IQueue<Long, Message> queue;
+ private MockBroker broker;
+
+ private Mapper<Integer, Message> partitionMapper;
+ private Mapper<Long, Message> keyExtractor;
private IQueue<Long, Message> createQueue() {
- if (this.mockBrokerTest.usePartitionedQueue) {
+ if (partitionMapper!=null) {
PartitionedQueue<Integer, Long, Message> queue = new PartitionedQueue<Integer, Long, Message>() {
@Override
protected IQueue<Long, Message> cratePartition(Integer partitionKey) {
return createSharedFlowQueue();
}
};
- queue.setPartitionMapper(this.mockBrokerTest.partitionMapper);
+ queue.setPartitionMapper(partitionMapper);
queue.setResourceName(destination.getName());
return queue;
} else {
@@ -46,33 +41,28 @@
}
private IQueue<Long, Message> createSharedFlowQueue() {
- if (broker.priorityLevels > 1) {
- PrioritySizeLimiter<Message> limiter = new PrioritySizeLimiter<Message>(100, 1, broker.priorityLevels);
+ if (MockBrokerTest.PRIORITY_LEVELS > 1) {
+ PrioritySizeLimiter<Message> limiter = new PrioritySizeLimiter<Message>(100, 1, MockBrokerTest.PRIORITY_LEVELS);
limiter.setPriorityMapper(Message.PRIORITY_MAPPER);
SharedPriorityQueue<Long, Message> queue = new SharedPriorityQueue<Long, Message>(destination.getName(), limiter);
- queue.setKeyMapper(this.mockBrokerTest.keyExtractor);
+ queue.setKeyMapper(keyExtractor);
queue.setAutoRelease(true);
- queue.setDispatcher(broker.dispatcher);
+ queue.setDispatcher(broker.getDispatcher());
return queue;
} else {
SizeLimiter<Message> limiter = new SizeLimiter<Message>(100, 1);
SharedQueue<Long, Message> queue = new SharedQueue<Long, Message>(destination.getName(), limiter);
- queue.setKeyMapper(this.mockBrokerTest.keyExtractor);
+ queue.setKeyMapper(keyExtractor);
queue.setAutoRelease(true);
- queue.setDispatcher(broker.dispatcher);
+ queue.setDispatcher(broker.getDispatcher());
return queue;
}
}
public final void deliver(ISourceController<Message> source, Message msg) {
-
queue.add(msg, source);
}
-
- public String getSelector() {
- return null;
- }
-
+
public final Destination getDestination() {
return destination;
}
@@ -113,6 +103,7 @@
}
public void start() throws Exception {
+ queue = createQueue();
}
public void stop() throws Exception {
@@ -126,4 +117,32 @@
return true;
}
+ public MockBroker getBroker() {
+ return broker;
+ }
+
+ public void setBroker(MockBroker broker) {
+ this.broker = broker;
+ }
+
+ public Mapper<Integer, Message> getPartitionMapper() {
+ return partitionMapper;
+ }
+
+ public void setPartitionMapper(Mapper<Integer, Message> partitionMapper) {
+ this.partitionMapper = partitionMapper;
+ }
+
+ public Mapper<Long, Message> getKeyExtractor() {
+ return keyExtractor;
+ }
+
+ public void setKeyExtractor(Mapper<Long, Message> keyExtractor) {
+ this.keyExtractor = keyExtractor;
+ }
+
+ public void setDestination(Destination destination) {
+ this.destination = destination;
+ }
+
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java?rev=743841&r1=743840&r2=743841&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java Thu Feb 12 19:05:05 2009
@@ -40,7 +40,6 @@
public void setBroker(MockBroker broker) {
this.broker = broker;
-
}
public void setTransport(Transport transport) {
@@ -48,15 +47,57 @@
}
public void start() throws Exception {
+ transport.setTransportListener(this);
+ transport.start();
+ }
+ public void stop() throws Exception {
+ stopping.set(true);
+ writer.shutdown();
+ if (transport != null) {
+ transport.stop();
+ }
+ }
+
+ public void onCommand(Object command) {
+ try {
+ // First command in should be the name of the connection
+ if( name==null ) {
+ name = (String) command;
+ initialize();
+ } else if (command.getClass() == Message.class) {
+ Message msg = (Message) command;
+ // Use the flow controller to send the message on so that we do
+ // not overflow
+ // the broker.
+ while (!inboundController.offer(msg, null)) {
+ inboundController.waitForFlowUnblock();
+ }
+ } else if (command.getClass() == Destination.class) {
+ // This is a subscription request
+ Destination destination = (Destination) command;
+ broker.subscribe(destination, this);
+ }
+ } catch (Exception e) {
+ onException(e);
+ }
+ }
+
+ private void initialize() {
// Setup the input processing..
SizeLimiter<Message> limiter = new SizeLimiter<Message>(inputWindowSize, inputResumeThreshold);
Flow flow = new Flow(name + "-inbound", false);
inboundController = new FlowController<Message>(new FlowControllable<Message>() {
public void flowElemAccepted(ISourceController<Message> controller, Message elem) {
broker.router.route(controller, elem);
+ inboundController.elementDispatched(elem);
}
+ @Override
+ public String toString() {
+ return name;
+ }
+
public IFlowSink<Message> getFlowSink() {
return null;
}
@@ -73,7 +114,7 @@
ExclusiveQueue<Message> queue = new ExclusiveQueue<Message>(flow, flow.getFlowName(), limiter);
this.output = queue;
} else {
- ExclusivePriorityQueue<Message> t = new ExclusivePriorityQueue<Message>(broker.priorityLevels, flow, name + "-outbound", outputWindowSize, outputResumeThreshold);
+ ExclusivePriorityQueue<Message> t = new ExclusivePriorityQueue<Message>(priorityLevels, flow, name + "-outbound", outputWindowSize, outputResumeThreshold);
t.setPriorityMapper(Message.PRIORITY_MAPPER);
this.output = t;
}
@@ -98,47 +139,15 @@
});
}
});
-
- transport.setTransportListener(this);
- transport.start();
- }
-
- public void stop() throws Exception {
- stopping.set(true);
- writer.shutdown();
- if (transport != null) {
- transport.stop();
- }
- }
-
- public void onCommand(Object command) {
- try {
- if (command.getClass() == Message.class) {
- Message msg = (Message) command;
- // Use the flow controller to send the message on so that we do
- // not overflow
- // the broker.
- while (!inboundController.offer(msg, null)) {
- inboundController.waitForFlowUnblock();
- }
- } else if (command.getClass() == Destination.class) {
- // This is a subscription request
- Destination destination = (Destination) command;
- broker.subscribe(destination, this);
- }
- } catch (Exception e) {
- onException(e);
- }
}
public void onException(IOException error) {
- if (!stopping.get()) {
- error.printStackTrace();
- }
+ onException((Exception)error);
}
public void onException(Exception error) {
- if (!stopping.get()) {
+ if (!stopping.get() && !broker.isStopping()) {
+ System.out.println("RemoteConnection error: "+error);
error.printStackTrace();
}
}
@@ -153,10 +162,6 @@
return name;
}
- public void setName(String name) {
- this.name = name;
- }
-
public int getPriorityLevels() {
return priorityLevels;
}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java?rev=743841&r1=743840&r2=743841&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java Thu Feb 12 19:05:05 2009
@@ -21,16 +21,19 @@
private MetricAggregator totalConsumerRate;
private long thinkTime;
private Destination destination;
+ private String selector;
public void start() throws Exception {
consumerRate.name("Consumer " + name + " Rate");
totalConsumerRate.add(consumerRate);
- URI uri = broker.transportServer.getConnectURI();
- transport = TransportFactory.connect(uri);
+ URI uri = broker.getConnectURI();
+ transport = TransportFactory.compositeConnect(uri);
transport.setTransportListener(this);
transport.start();
+ // Let the remote side know our name.
+ transport.oneway(name);
// Sending the destination acts as the subscribe.
transport.oneway(destination);
}
@@ -61,6 +64,7 @@
public void onException(IOException error) {
if( !stopping.get() ) {
+ System.out.println("RemoteConsumer error: "+error);
error.printStackTrace();
}
}
@@ -77,15 +81,6 @@
this.broker = broker;
}
- public Transport getTransport() {
- return transport;
- }
-
- public void setTransport(Transport transport) {
- this.transport = transport;
- }
-
-
public MockBroker getBroker() {
return broker;
}
@@ -98,8 +93,8 @@
return totalConsumerRate;
}
- public void setTotalConsumerRate(MetricAggregator totalProducerRate) {
- this.totalConsumerRate = totalProducerRate;
+ public void setTotalConsumerRate(MetricAggregator totalConsumerRate) {
+ this.totalConsumerRate = totalConsumerRate;
}
public Destination getDestination() {
@@ -108,4 +103,24 @@
public void setDestination(Destination destination) {
this.destination = destination;
+ }
+
+ public long getThinkTime() {
+ return thinkTime;
+ }
+
+ public void setThinkTime(long thinkTime) {
+ this.thinkTime = thinkTime;
+ }
+
+ public MetricCounter getConsumerRate() {
+ return consumerRate;
+ }
+
+ public String getSelector() {
+ return selector;
+ }
+
+ public void setSelector(String selector) {
+ this.selector = selector;
}}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java?rev=743841&r1=743840&r2=743841&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java Thu Feb 12 19:05:05 2009
@@ -14,7 +14,7 @@
public class RemoteProducer implements TransportListener, Runnable {
private final AtomicBoolean stopping = new AtomicBoolean();
- private final MetricCounter producerRate = new MetricCounter();
+ private final MetricCounter rate = new MetricCounter();
private Transport transport;
private MockBroker broker;
@@ -30,14 +30,17 @@
private MetricAggregator totalProducerRate;
public void start() throws Exception {
- producerRate.name("Producer " + name + " Rate");
- totalProducerRate.add(producerRate);
+ rate.name("Producer " + name + " Rate");
+ totalProducerRate.add(rate);
- URI uri = broker.transportServer.getConnectURI();
- transport = TransportFactory.connect(uri);
+ URI uri = broker.getConnectURI();
+ transport = TransportFactory.compositeConnect(uri);
transport.setTransportListener(this);
transport.start();
+ // Let the remote side know our name.
+ transport.oneway(name);
+
thread = new Thread(this, name);
thread.start();
}
@@ -46,9 +49,9 @@
stopping.set(true);
if( transport!=null ) {
transport.stop();
- transport=null;
}
thread.join();
+ transport=null;
}
public void run() {
@@ -66,6 +69,7 @@
}
transport.oneway(next);
+ rate.increment();
}
} catch (IOException e) {
onException(e);
@@ -78,6 +82,7 @@
public void onException(IOException error) {
if( !stopping.get() ) {
+ System.out.println("RemoteProducer error: "+error);
error.printStackTrace();
}
}
@@ -94,14 +99,6 @@
this.broker = broker;
}
- public Transport getTransport() {
- return transport;
- }
-
- public void setTransport(Transport transport) {
- this.transport = transport;
- }
-
public AtomicLong getMessageIdGenerator() {
return messageIdGenerator;
}
@@ -126,14 +123,6 @@
this.priorityMod = priorityMod;
}
- public int getCounter() {
- return counter;
- }
-
- public void setCounter(int msgCounter) {
- this.counter = msgCounter;
- }
-
public int getProducerId() {
return producerId;
}
@@ -172,4 +161,8 @@
public void setTotalProducerRate(MetricAggregator totalProducerRate) {
this.totalProducerRate = totalProducerRate;
+ }
+
+ public MetricCounter getRate() {
+ return rate;
}}
Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java?rev=743841&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java Thu Feb 12 19:05:05 2009
@@ -0,0 +1,32 @@
+/**
+ *
+ */
+package org.apache.activemq.flow;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+
+import org.apache.activemq.flow.MockBrokerTest.DeliveryTarget;
+
+public class Router {
+ final HashMap<Destination, Collection<DeliveryTarget>> lookupTable = new HashMap<Destination, Collection<DeliveryTarget>>();
+
+ final synchronized void bind(DeliveryTarget dt, Destination destination) {
+ Collection<DeliveryTarget> targets = lookupTable.get(destination);
+ if (targets == null) {
+ targets = new ArrayList<DeliveryTarget>();
+ lookupTable.put(destination, targets);
+ }
+ targets.add(dt);
+ }
+
+ final void route(ISourceController<Message> source, Message msg) {
+ Collection<DeliveryTarget> targets = lookupTable.get(msg.getDestination());
+ for (DeliveryTarget dt : targets) {
+ if (dt.match(msg)) {
+ dt.getSink().add(msg, source);
+ }
+ }
+ }
+}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/TestFlowManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/TestFlowManager.java?rev=743841&r1=743840&r2=743841&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/TestFlowManager.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/TestFlowManager.java Thu Feb 12 19:05:05 2009
@@ -23,11 +23,11 @@
import org.apache.activemq.queue.MultiFlowQueue;
public class TestFlowManager {
- synchronized <T> IFlowQueue<T> createQueue(String name, Flow flow, int capacity, int resumeThreshold) {
+ static <T> IFlowQueue<T> createQueue(String name, Flow flow, int capacity, int resumeThreshold) {
return createFlowQueue(flow, name, capacity, resumeThreshold);
}
- public synchronized <T> IFlowQueue<T> createFlowQueue(Flow flow, String name, int capacity, int resumeThreshold) {
+ static public <T> IFlowQueue<T> createFlowQueue(Flow flow, String name, int capacity, int resumeThreshold) {
IFlowQueue<T> queue;
if (flow != null) {
queue = new ExclusiveQueue<T>(flow, name, new SizeLimiter<T>(capacity, resumeThreshold));
@@ -37,7 +37,7 @@
return queue;
}
- public Flow createFlow(String name) {
+ static public Flow createFlow(String name) {
Flow rc = new Flow(name, false);
return rc;
}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java?rev=743841&r1=743840&r2=743841&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java Thu Feb 12 19:05:05 2009
@@ -1,12 +1,12 @@
package org.apache.activemq.flow;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-import java.io.OutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
@@ -15,17 +15,25 @@
public class TestWireFormatFactory implements WireFormatFactory {
- public class TestWireFormat implements WireFormat {
+ static public class TestWireFormat implements WireFormat {
public void marshal(Object value, DataOutput out) throws IOException {
- ObjectOutputStream oos = new ObjectOutputStream((OutputStream) out);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(value);
- oos.reset();
- oos.flush();
+ oos.close();
+
+ byte[] data = baos.toByteArray();
+ out.writeInt(data.length);
+ out.write(data);
}
public Object unmarshal(DataInput in) throws IOException {
- ObjectInputStream ois = new ObjectInputStream((InputStream) in);
+ byte data[] = new byte[in.readInt()];
+ in.readFully(data);
+
+ ByteArrayInputStream is = new ByteArrayInputStream(data);
+ ObjectInputStream ois = new ObjectInputStream(is);
try {
return ois.readObject();
} catch (ClassNotFoundException e) {
@@ -53,6 +61,6 @@
public WireFormat createWireFormat() {
return new TestWireFormat();
- }
+ }
}