You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/12 19:04:50 UTC
svn commit: r752957 [2/2] - in /activemq/sandbox/activemq-flow/src:
main/java/org/apache/activemq/dispatch/ main/java/org/apache/activemq/flow/
main/java/org/apache/activemq/queue/
main/java/org/apache/activemq/transport/ test/java/org/apache/activemq/...
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java Thu Mar 12 18:04:49 2009
@@ -16,10 +16,7 @@
*/
package org.apache.activemq.flow;
-import java.io.IOException;
-import java.net.URISyntaxException;
import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicLong;
import junit.framework.TestCase;
@@ -28,21 +25,19 @@
import org.apache.activemq.flow.Commands.Destination;
import org.apache.activemq.flow.Commands.Destination.DestinationBean;
import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
-import org.apache.activemq.metric.MetricAggregator;
-import org.apache.activemq.metric.Period;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.queue.Mapper;
public class MockBrokerTest extends TestCase {
- protected static final int PERFORMANCE_SAMPLES = 3;
+ protected static final int PERFORMANCE_SAMPLES = 5;
+ protected static final int SAMPLING_FREQUENCY = 5;
- protected static final int IO_WORK_AMOUNT = 0;
protected static final int FANIN_COUNT = 10;
protected static final int FANOUT_COUNT = 10;
protected static final int PRIORITY_LEVELS = 10;
- protected static final boolean USE_INPUT_QUEUES = true;
+ protected static final boolean USE_INPUT_QUEUES = false;
// Set to put senders and consumers on separate brokers.
protected boolean multibroker = false;
@@ -59,21 +54,16 @@
protected String receiveBrokerURI;
// Set's the number of threads to use:
- protected final int asyncThreadPoolSize = Runtime.getRuntime().availableProcessors();
+ protected static final boolean SEPARATE_CLIENT_DISPATCHER = false;
+ protected final int threadsPerDispatcher = Runtime.getRuntime().availableProcessors();
protected boolean usePartitionedQueue = false;
- protected int producerCount;
- protected int consumerCount;
- protected int destCount;
-
- protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items");
- protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items");
-
+ protected ArrayList<MockBroker> brokers = new ArrayList<MockBroker>();
protected MockBroker sendBroker;
protected MockBroker rcvBroker;
- protected ArrayList<MockBroker> brokers = new ArrayList<MockBroker>();
+ protected MockClient client;
+
protected IDispatcher dispatcher;
- protected final AtomicLong msgIdGenerator = new AtomicLong();
static public final Mapper<Long, Message> KEY_MAPPER = new Mapper<Long, Message>() {
public Long map(Message element) {
@@ -90,8 +80,6 @@
@Override
protected void setUp() throws Exception {
- dispatcher = createDispatcher();
- dispatcher.start();
if (tcp) {
sendBrokerURI = "tcp://localhost:10000?wireFormat=proto";
receiveBrokerURI = "tcp://localhost:20000?wireFormat=proto";
@@ -106,119 +94,79 @@
}
}
- protected IDispatcher createDispatcher() {
- return PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", Message.MAX_PRIORITY, asyncThreadPoolSize);
+ protected IDispatcher createDispatcher(String name) {
+ return PriorityDispatcher.createPriorityDispatchPool(name, Message.MAX_PRIORITY, threadsPerDispatcher);
}
-
+
public void test_1_1_0() throws Exception {
- producerCount = 1;
- destCount = 1;
- createConnections();
+ client = new MockClient();
+ client.setNumProducers(1);
+ client.setDestCount(1);
+ client.setNumConsumers(0);
- // Start 'em up.
- startServices();
- try {
- reportRates();
- } finally {
- stopServices();
- }
+ createConnections(1);
+ runTestCase();
}
public void test_1_1_1() throws Exception {
- producerCount = 1;
- destCount = 1;
- consumerCount = 1;
-
- createConnections();
+ client = new MockClient();
+ client.setNumProducers(1);
+ client.setDestCount(1);
+ client.setNumConsumers(1);
- // Start 'em up.
- startServices();
- try {
- reportRates();
- } finally {
- stopServices();
- }
+ createConnections(1);
+ runTestCase();
}
public void test_10_10_10() throws Exception {
- producerCount = FANIN_COUNT;
- destCount = FANIN_COUNT;
- consumerCount = FANOUT_COUNT;
-
- createConnections();
+ client = new MockClient();
+ client.setNumProducers(FANIN_COUNT);
+ client.setDestCount(FANIN_COUNT);
+ client.setNumConsumers(FANOUT_COUNT);
- // Start 'em up.
- startServices();
- try {
- reportRates();
- } finally {
- stopServices();
- }
+ createConnections(FANIN_COUNT);
+ runTestCase();
}
public void test_10_1_10() throws Exception {
- producerCount = FANIN_COUNT;
- consumerCount = FANOUT_COUNT;
- destCount = 1;
+ client = new MockClient();
+ client.setNumProducers(FANIN_COUNT);
+ client.setDestCount(1);
+ client.setNumConsumers(FANOUT_COUNT);
- createConnections();
-
- // Start 'em up.
- startServices();
- try {
- reportRates();
- } finally {
- stopServices();
- }
+ createConnections(1);
+ runTestCase();
}
public void test_10_1_1() throws Exception {
- producerCount = FANIN_COUNT;
- destCount = 1;
- consumerCount = 1;
+ client = new MockClient();
+ client.setNumProducers(FANIN_COUNT);
+ client.setDestCount(1);
+ client.setNumConsumers(1);
- createConnections();
-
- // Start 'em up.
- startServices();
- try {
- reportRates();
- } finally {
- stopServices();
- }
+ createConnections(1);
+ runTestCase();
}
public void test_1_1_10() throws Exception {
- producerCount = 1;
- destCount = 1;
- consumerCount = FANOUT_COUNT;
-
- createConnections();
+ client = new MockClient();
+ client.setNumProducers(1);
+ client.setDestCount(1);
+ client.setNumConsumers(FANOUT_COUNT);
- // Start 'em up.
- startServices();
- try {
- reportRates();
- } finally {
- stopServices();
- }
+ createConnections(1);
+ runTestCase();
}
public void test_2_2_2() throws Exception {
- producerCount = 2;
- destCount = 2;
- consumerCount = 2;
-
- createConnections();
+ client = new MockClient();
+ client.setNumProducers(2);
+ client.setDestCount(2);
+ client.setNumConsumers(2);
- // Start 'em up.
- startServices();
- try {
- reportRates();
- } finally {
- stopServices();
- }
+ createConnections(2);
+ runTestCase();
}
/**
@@ -229,43 +177,33 @@
* @throws Exception
*/
public void test_2_2_2_SlowConsumer() throws Exception {
- producerCount = 2;
- destCount = 2;
- consumerCount = 2;
+ client = new MockClient();
+ client.setNumProducers(2);
+ client.setDestCount(2);
+ client.setNumConsumers(2);
+
+ createConnections(2);
+ client.consumer(0).setThinkTime(50);
+ runTestCase();
- createConnections();
- rcvBroker.consumers.get(0).setThinkTime(50);
-
- // Start 'em up.
- startServices();
- try {
- reportRates();
- } finally {
- stopServices();
- }
}
public void test_2_2_2_Selector() throws Exception {
- producerCount = 2;
- destCount = 2;
- consumerCount = 2;
+ client = new MockClient();
+ client.setNumProducers(2);
+ client.setDestCount(2);
+ client.setNumConsumers(2);
- createConnections();
+ createConnections(2);
// Add properties to match producers to their consumers
- for (int i = 0; i < consumerCount; i++) {
+ for (int i = 0; i < 2; i++) {
String property = "match" + i;
- rcvBroker.consumers.get(i).setSelector(property);
- sendBroker.producers.get(i).setProperty(property);
+ client.consumer(i).setSelector(property);
+ client.producer(i).setProperty(property);
}
- // Start 'em up.
- startServices();
- try {
- reportRates();
- } finally {
- stopServices();
- }
+ runTestCase();
}
/**
@@ -276,35 +214,20 @@
*/
public void test_2_1_1_HighPriorityProducer() throws Exception {
- producerCount = 2;
- destCount = 1;
- consumerCount = 1;
-
- createConnections();
- RemoteProducer producer = sendBroker.producers.get(0);
+ client = new MockClient();
+ client.setNumProducers(2);
+ client.setNumConsumers(1);
+ client.setDestCount(1);
+
+ createConnections(1);
+ RemoteProducer producer = client.producer(0);
+ client.includeInRateReport(producer);
producer.setPriority(1);
producer.getRate().setName("High Priority Producer Rate");
- rcvBroker.consumers.get(0).setThinkTime(1);
+ client.consumer(0).setThinkTime(1);
- // Start 'em up.
- startServices();
- try {
-
- System.out.println("Checking rates for test: " + getName());
- for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
- Period p = new Period();
- Thread.sleep(1000 * 5);
- System.out.println(producer.getRate().getRateSummary(p));
- System.out.println(totalProducerRate.getRateSummary(p));
- System.out.println(totalConsumerRate.getRateSummary(p));
- totalProducerRate.reset();
- totalConsumerRate.reset();
- }
-
- } finally {
- stopServices();
- }
+ runTestCase();
}
/**
@@ -314,53 +237,26 @@
* @throws Exception
*/
public void test_2_1_1_MixedHighPriorityProducer() throws Exception {
- producerCount = 2;
- destCount = 1;
- consumerCount = 1;
+ client = new MockClient();
+ client.setNumProducers(2);
+ client.setNumConsumers(1);
+ client.setDestCount(1);
- createConnections();
- RemoteProducer producer = sendBroker.producers.get(0);
+ createConnections(1);
+ RemoteProducer producer = client.producer(0);
producer.setPriority(1);
producer.setPriorityMod(3);
producer.getRate().setName("High Priority Producer Rate");
- rcvBroker.consumers.get(0).setThinkTime(1);
-
- // Start 'em up.
- startServices();
- try {
-
- System.out.println("Checking rates for test: " + getName());
- for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
- Period p = new Period();
- Thread.sleep(1000 * 5);
- System.out.println(producer.getRate().getRateSummary(p));
- System.out.println(totalProducerRate.getRateSummary(p));
- System.out.println(totalConsumerRate.getRateSummary(p));
- totalProducerRate.reset();
- totalConsumerRate.reset();
- }
-
- } finally {
- stopServices();
- }
+ client.consumer(0).setThinkTime(1);
+ runTestCase();
}
- private void reportRates() throws InterruptedException {
- System.out.println("Checking rates for test: " + getName() + ", " + (ptp ? "ptp" : "topic"));
- for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
- Period p = new Period();
- Thread.sleep(1000 * 5);
- System.out.println(totalProducerRate.getRateSummary(p));
- System.out.println(totalConsumerRate.getRateSummary(p));
- totalProducerRate.reset();
- totalConsumerRate.reset();
- }
- }
+ private void createConnections(int destCount) throws Exception {
- private void createConnections() throws IOException, URISyntaxException {
+ dispatcher = createDispatcher("BrokerDispatcher");
+ dispatcher.start();
- FlowController.setFlowExecutor(dispatcher.createPriorityExecutor(Message.MAX_PRIORITY));
if (multibroker) {
sendBroker = createBroker("SendBroker", sendBrokerURI);
rcvBroker = createBroker("RcvBroker", receiveBrokerURI);
@@ -388,46 +284,27 @@
}
}
- for (int i = 0; i < producerCount; i++) {
- Destination destination = dests[i % destCount];
- RemoteProducer producer = createProducer(i, destination);
- sendBroker.producers.add(producer);
- }
-
- for (int i = 0; i < consumerCount; i++) {
- Destination destination = dests[i % destCount];
- RemoteConsumer consumer = createConsumer(i, destination);
- sendBroker.consumers.add(consumer);
- }
-
- // Create MultiBroker connections:
- // if (multibroker) {
- // Pipe<Message> pipe = new Pipe<Message>();
- // sendBroker.createBrokerConnection(rcvBroker, pipe);
- // rcvBroker.createBrokerConnection(sendBroker, pipe.connect());
- // }
- }
-
- private RemoteConsumer createConsumer(int i, Destination destination) {
- RemoteConsumer consumer = new RemoteConsumer();
- consumer.setBroker(rcvBroker);
- consumer.setDestination(destination);
- consumer.setName("consumer" + (i + 1));
- consumer.setTotalConsumerRate(totalConsumerRate);
- consumer.setDispatcher(dispatcher);
- return consumer;
- }
-
- private RemoteProducer createProducer(int id, Destination destination) {
- RemoteProducer producer = new RemoteProducer();
- producer.setBroker(sendBroker);
- producer.setProducerId(id + 1);
- producer.setName("producer" + (id + 1));
- producer.setDestination(destination);
- producer.setMessageIdGenerator(msgIdGenerator);
- producer.setTotalProducerRate(totalProducerRate);
- producer.setDispatcher(dispatcher);
- return producer;
+ IDispatcher clientDispatcher = null;
+ if (SEPARATE_CLIENT_DISPATCHER) {
+ clientDispatcher = createDispatcher("ClientDispatcher");
+ clientDispatcher.start();
+ } else {
+ clientDispatcher = dispatcher;
+ }
+
+ // Configure Client:
+ client.setDispatcher(clientDispatcher);
+ client.setNumPriorities(PRIORITY_LEVELS);
+ client.setSendBrokerURI(sendBroker.getUri());
+ client.setReceiveBrokerURI(rcvBroker.getUri());
+ client.setPerformanceSamples(PERFORMANCE_SAMPLES);
+ client.setSamplingFrequency(1000 * SAMPLING_FREQUENCY);
+ client.setThreadsPerDispatcher(threadsPerDispatcher);
+ client.setUseInputQueues(USE_INPUT_QUEUES);
+ client.setPtp(ptp);
+ client.setTestName(getName());
+
+ client.createConnections();
}
private MockQueue createQueue(MockBroker broker, Destination destination) {
@@ -446,23 +323,37 @@
broker.setName(name);
broker.setUri(uri);
broker.setDispatcher(dispatcher);
+ broker.setUseInputQueues(USE_INPUT_QUEUES);
return broker;
}
+ private void runTestCase() throws Exception {
+ // Start 'em up.
+ startServices();
+ try {
+ client.runTest();
+ } finally {
+ stopServices();
+ }
+ }
+
private void stopServices() throws Exception {
+
for (MockBroker broker : brokers) {
broker.stopServices();
}
+
+ client.getDispatcher().shutdown();
if (dispatcher != null) {
dispatcher.shutdown();
}
}
private void startServices() throws Exception {
+
for (MockBroker broker : brokers) {
broker.startServices();
}
- //SelectorManager.SINGLETON.setChannelExecutor(dispatcher.createPriorityExecutor(PRIORITY_LEVELS));
}
}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java Thu Mar 12 18:04:49 2009
@@ -87,7 +87,7 @@
public void oneway(Object command) throws IOException {
try {
- if( wireFormat!=null ) {
+ if (wireFormat != null) {
pipe.write(wireFormat.marshal(command));
} else {
pipe.write(command);
@@ -110,11 +110,12 @@
pipe.setReadReadyListener(this);
return true;
} else {
- if( wireFormat!=null ) {
- listener.onCommand(wireFormat.unmarshal((ByteSequence)o));
+ if (wireFormat != null) {
+ listener.onCommand(wireFormat.unmarshal((ByteSequence) o));
} else {
listener.onCommand(o);
}
+ return false;
}
} catch (IOException e) {
listener.onException(e);
@@ -192,6 +193,10 @@
public void setWireFormat(WireFormat wireFormat) {
this.wireFormat = wireFormat;
}
+
+ public void setDispatchPriority(int priority) {
+ readContext.updatePriority(priority);
+ }
}
private class PipeTransportServer implements TransportServer {
@@ -243,7 +248,7 @@
rc.setRemoteAddress(remoteAddress);
PipeTransport serverSide = new PipeTransport(pipe.connect());
serverSide.setRemoteAddress(remoteAddress);
- if( wireFormatFactory!=null ) {
+ if (wireFormatFactory != null) {
rc.setWireFormat(wireFormatFactory.createWireFormat());
serverSide.setWireFormat(wireFormatFactory.createWireFormat());
}
@@ -268,7 +273,7 @@
PipeTransportServer server = new PipeTransportServer();
server.setConnectURI(uri);
server.setName(node);
- if( options.containsKey("wireFormat") ) {
+ if (options.containsKey("wireFormat")) {
server.setWireFormatFactory(createWireFormatFactory(options));
}
servers.put(node, server);
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java Thu Mar 12 18:04:49 2009
@@ -1,336 +0,0 @@
-package org.apache.activemq.flow;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.activemq.dispatch.IDispatcher;
-import org.apache.activemq.flow.Commands.Destination;
-import org.apache.activemq.flow.Commands.FlowControl;
-import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
-import org.apache.activemq.flow.Commands.FlowControl.FlowControlBean;
-import org.apache.activemq.flow.Commands.FlowControl.FlowControlBuffer;
-import org.apache.activemq.flow.ISinkController.FlowControllable;
-import org.apache.activemq.flow.MockBroker.DeliveryTarget;
-import org.apache.activemq.queue.SingleFlowRelay;
-import org.apache.activemq.transport.DispatchableTransport;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportListener;
-
-public class RemoteConnection implements TransportListener, DeliveryTarget {
-
- protected Transport transport;
- protected MockBroker broker;
-
- protected final Object inboundMutex = new Object();
- protected IFlowController<Message> inboundController;
-
- protected SingleFlowRelay<Message> outputQueue;
- protected IFlowController<Message> outboundController;
- protected ProtocolLimiter<Message> outboundLimiter;
- protected Flow ouboundFlow;
-
- protected String name;
-
- private int priorityLevels;
-
- private final int outputWindowSize = 1000;
- private final int outputResumeThreshold = 900;
-
- private final int inputWindowSize = 1000;
- private final int inputResumeThreshold = 500;
-
- private IDispatcher dispatcher;
- private final AtomicBoolean stopping = new AtomicBoolean();
- protected Flow outputFlow;
- protected boolean blockingTransport = false;
- ExecutorService blockingWriter;
-
- public void setBroker(MockBroker broker) {
- this.broker = broker;
- }
-
- public void setTransport(Transport transport) {
- this.transport = transport;
- }
-
- public void start() throws Exception {
- transport.setTransportListener(this);
- transport.start();
- }
-
- public void stop() throws Exception {
- stopping.set(true);
- if (transport != null) {
- transport.stop();
- }
- if (blockingWriter != null) {
- blockingWriter.shutdown();
- }
- }
-
- public void onCommand(Object command) {
- try {
- // System.out.println("Got Command: " + command);
- // First command in should be the name of the connection
- if (name == null) {
- name = (String) command;
- initialize();
- } else if (command.getClass() == Message.class) {
- Message msg = (Message) command;
- inboundController.add(msg, null);
- } else if (command.getClass() == DestinationBuffer.class) {
- // This is a subscription request
- Destination destination = (Destination) command;
-
- broker.subscribe(destination, this);
- } else if (command.getClass() == FlowControlBuffer.class) {
- // This is a subscription request
- FlowControl fc = (FlowControl) command;
- synchronized (outputQueue) {
- outboundLimiter.onProtocolMessage(fc);
- }
- } else {
- onException(new Exception("Unrecognized command: " + command));
- }
- } catch (Exception e) {
- onException(e);
- }
- }
-
- protected void initialize() {
- // Setup the input processing..
- Flow flow = new Flow(name, false);
- WindowLimiter<Message> limiter = new WindowLimiter<Message>(false, flow, inputWindowSize, inputResumeThreshold);
-
- inboundController = new FlowController<Message>(new FlowControllable<Message>() {
- public void flowElemAccepted(ISourceController<Message> controller, Message elem) {
- messageReceived(controller, elem);
- }
-
- @Override
- public String toString() {
- return name;
- }
-
- public IFlowSink<Message> getFlowSink() {
- return null;
- }
-
- public IFlowSource<Message> getFlowSource() {
- return null;
- }
- }, flow, limiter, inboundMutex);
-
- ouboundFlow = new Flow(name, false);
- outboundLimiter = new WindowLimiter<Message>(true, ouboundFlow, outputWindowSize, outputResumeThreshold);
- outputQueue = new SingleFlowRelay<Message>(ouboundFlow, name + "-outbound", outboundLimiter);
- outboundController = outputQueue.getFlowController(ouboundFlow);
-
- if (transport instanceof DispatchableTransport) {
- outputQueue.setDrain(new IFlowDrain<Message>() {
-
- public void drain(Message message, ISourceController<Message> controller) {
- write(message);
- }
- });
-
- } else {
- blockingTransport = true;
- blockingWriter = Executors.newSingleThreadExecutor();
- outputQueue.setDrain(new IFlowDrain<Message>() {
- public void drain(final Message message, ISourceController<Message> controller) {
- write(message);
- };
- });
- /*
- * // Setup output processing final Executor writer =
- * Executors.newSingleThreadExecutor(); FlowControllable<Message>
- * controllable = new FlowControllable<Message>() { public void
- * flowElemAccepted( final ISourceController<Message> controller,
- * final Message elem) { writer.execute(new Runnable() { public void
- * run() { if (!stopping.get()) { try { transport.oneway(elem);
- * controller.elementDispatched(elem); } catch (IOException e) {
- * onException(e); } } } }); }
- *
- * public IFlowSink<Message> getFlowSink() { return null; }
- *
- * public IFlowSource<Message> getFlowSource() { return null; } };
- *
- * if (priorityLevels <= 1) { outboundController = new
- * FlowController<Message>(controllable, flow, limiter,
- * outboundMutex); } else { PrioritySizeLimiter<Message> pl = new
- * PrioritySizeLimiter<Message>( outputWindowSize,
- * outputResumeThreshold, priorityLevels);
- * pl.setPriorityMapper(Message.PRIORITY_MAPPER); outboundController
- * = new PriorityFlowController<Message>( controllable, flow, pl,
- * outboundMutex); }
- */
- }
- // outputQueue.setDispatcher(dispatcher);
-
- }
-
- private final void write(final Object o) {
- synchronized (outputQueue) {
- if (!blockingTransport) {
- try {
- transport.oneway(o);
- } catch (IOException e) {
- onException(e);
- }
- } else {
- try {
- blockingWriter.execute(new Runnable() {
- public void run() {
- if (!stopping.get()) {
- try {
- transport.oneway(o);
- } catch (IOException e) {
- onException(e);
- }
- }
- }
- });
- } catch (RejectedExecutionException re) {
- //Must be shutting down.
- }
- }
- }
- }
-
- protected void messageReceived(ISourceController<Message> controller, Message elem) {
- broker.router.route(controller, elem);
- inboundController.elementDispatched(elem);
- }
-
- public void onException(IOException error) {
- onException((Exception) error);
- }
-
- public void onException(Exception error) {
- if (!stopping.get() && !broker.isStopping()) {
- System.out.println("RemoteConnection error: " + error);
- error.printStackTrace();
- }
- }
-
- public void transportInterupted() {
- }
-
- public void transportResumed() {
- }
-
- public String getName() {
- return name;
- }
-
- public int getPriorityLevels() {
- return priorityLevels;
- }
-
- public void setPriorityLevels(int priorityLevels) {
- this.priorityLevels = priorityLevels;
- }
-
- public IDispatcher getDispatcher() {
- return dispatcher;
- }
-
- public void setDispatcher(IDispatcher dispatcher) {
- this.dispatcher = dispatcher;
- if (transport instanceof DispatchableTransport) {
- DispatchableTransport dt = ((DispatchableTransport) transport);
- if (name != null) {
- dt.setName(name);
- }
- dt.setDispatcher(getDispatcher());
- }
- }
-
- public MockBroker getBroker() {
- return broker;
- }
-
- public int getOutputWindowSize() {
- return outputWindowSize;
- }
-
- public int getOutputResumeThreshold() {
- return outputResumeThreshold;
- }
-
- public int getInputWindowSize() {
- return inputWindowSize;
- }
-
- public int getInputResumeThreshold() {
- return inputResumeThreshold;
- }
-
- public IFlowSink<Message> getSink() {
- return outputQueue;
- }
-
- public boolean match(Message message) {
- return true;
- }
-
- private interface ProtocolLimiter<E> extends IFlowLimiter<E> {
- public void onProtocolMessage(FlowControl m);
- }
-
- private class WindowLimiter<E> extends SizeLimiter<E> implements ProtocolLimiter<E> {
- final Flow flow;
- final boolean clientMode;
- private int available;
-
- public WindowLimiter(boolean clientMode, Flow flow, int capacity, int resumeThreshold) {
- super(capacity, resumeThreshold);
- this.clientMode = clientMode;
- this.flow = flow;
- }
-
- public void reserve(E elem) {
- super.reserve(elem);
- if (!clientMode) {
- // System.out.println(RemoteConnection.this.name + " Reserved "
- // + this);
- }
- }
-
- public void releaseReserved(E elem) {
- super.reserve(elem);
- if (!clientMode) {
- // System.out.println(RemoteConnection.this.name +
- // " Released Reserved " + this);
- }
- }
-
- protected void remove(int size) {
- super.remove(size);
- if (!clientMode) {
- available += size;
- if (available >= capacity - resumeThreshold) {
- FlowControlBean fc = new FlowControlBean();
- fc.setCredit(available);
- write(fc.freeze());
- // System.out.println(RemoteConnection.this.name +
- // " Send Release " + available + this);
- available = 0;
- }
- }
- }
-
- public void onProtocolMessage(FlowControl m) {
- remove(m.getCredit());
- }
-
- public int getElementSize(Message m) {
- return m.getFlowLimiterSize();
- }
- }
-
-}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java Thu Mar 12 18:04:49 2009
@@ -9,7 +9,7 @@
import org.apache.activemq.transport.DispatchableTransport;
import org.apache.activemq.transport.TransportFactory;
-public class RemoteConsumer extends RemoteConnection{
+public class RemoteConsumer extends ClientConnection {
private final MetricCounter consumerRate = new MetricCounter();
@@ -18,52 +18,35 @@
private Destination destination;
private String selector;
- private boolean schedualWait;
-
+ private boolean schedualWait = true;
+
public void start() throws Exception {
consumerRate.name("Consumer " + name + " Rate");
totalConsumerRate.add(consumerRate);
-
- URI uri = broker.getConnectURI();
- transport = TransportFactory.compositeConnect(uri);
- if(transport instanceof DispatchableTransport)
- {
- DispatchableTransport dt = ((DispatchableTransport)transport);
- dt.setName(name);
- dt.setDispatcher(getDispatcher());
- schedualWait = true;
- }
- transport.setTransportListener(this);
- transport.start();
-
- // Let the remote side know our name.
- transport.oneway(name);
- // Sending the destination acts as the subscribe.
- transport.oneway(destination);
- super.initialize();
+ super.start();
+ // subscribe:
+ write(destination);
}
-
+
protected void messageReceived(final ISourceController<Message> controller, final Message elem) {
- if( schedualWait ) {
+ if (schedualWait) {
if (thinkTime > 0) {
- getDispatcher().schedule(new Runnable(){
+ getDispatcher().schedule(new Runnable() {
public void run() {
consumerRate.increment();
controller.elementDispatched(elem);
}
-
+
}, thinkTime, TimeUnit.MILLISECONDS);
-
- }
- else
- {
+
+ } else {
consumerRate.increment();
controller.elementDispatched(elem);
}
} else {
- if( thinkTime>0 ) {
+ if (thinkTime > 0) {
try {
Thread.sleep(thinkTime);
} catch (InterruptedException e) {
@@ -74,6 +57,10 @@
}
}
+ public MetricCounter getRate() {
+ return consumerRate;
+ }
+
public void setName(String name) {
this.name = name;
}
@@ -112,4 +99,5 @@
public void setSelector(String selector) {
this.selector = selector;
- }}
+ }
+}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java Thu Mar 12 18:04:49 2009
@@ -1,6 +1,5 @@
package org.apache.activemq.flow;
-import java.net.URI;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
@@ -9,11 +8,8 @@
import org.apache.activemq.flow.ISinkController.FlowUnblockListener;
import org.apache.activemq.metric.MetricAggregator;
import org.apache.activemq.metric.MetricCounter;
-import org.apache.activemq.transport.DispatchableTransport;
-import org.apache.activemq.transport.TransportFactory;
-
-public class RemoteProducer extends RemoteConnection implements Dispatchable, FlowUnblockListener<Message>{
+public class RemoteProducer extends ClientConnection implements Dispatchable, FlowUnblockListener<Message> {
private final MetricCounter rate = new MetricCounter();
@@ -29,104 +25,84 @@
private DispatchContext dispatchContext;
private String filler;
- private int payloadSize = 20;
-
+ private int payloadSize = 0;
+ IFlowController<Message> outboundController;
+
public void start() throws Exception {
-
- if( payloadSize>0 ) {
+
+ if (payloadSize > 0) {
StringBuilder sb = new StringBuilder(payloadSize);
- for( int i=0; i < payloadSize; ++i) {
- sb.append((char)('a'+(i%26)));
+ for (int i = 0; i < payloadSize; ++i) {
+ sb.append((char) ('a' + (i % 26)));
}
filler = sb.toString();
}
-
+
rate.name("Producer " + name + " Rate");
totalProducerRate.add(rate);
- URI uri = broker.getConnectURI();
- transport = TransportFactory.compositeConnect(uri);
- transport.setTransportListener(this);
- if(transport instanceof DispatchableTransport)
- {
- DispatchableTransport dt = ((DispatchableTransport)transport);
- dt.setName(name + "-client-transport");
- dt.setDispatcher(getDispatcher());
- }
- super.setTransport(transport);
-
- super.initialize();
- transport.start();
- // Let the remote side know our name.
- transport.oneway(name);
+ super.start();
+ outboundController = outputQueue.getFlowController(outboundFlow);
dispatchContext = getDispatcher().register(this, name + "-client");
dispatchContext.requestDispatch();
}
-
- public void stop() throws Exception
- {
- dispatchContext.close(false);
- super.stop();
- }
-
- public void onFlowUnblocked(ISinkController<Message> controller) {
- dispatchContext.requestDispatch();
- }
-
- public boolean dispatch() {
- while(true)
- {
-
- if(next == null)
- {
- int priority = this.priority;
- if (priorityMod > 0) {
- priority = counter % priorityMod == 0 ? 0 : priority;
- }
-
- next = new Message(messageIdGenerator.getAndIncrement(), producerId, createPayload(), null, destination, priority);
- if (property != null) {
- next.setProperty(property);
- }
- }
-
- //If flow controlled stop until flow control is lifted.
- if(outboundController.isSinkBlocked())
- {
- if(outboundController.addUnblockListener(this))
- {
- return true;
- }
- }
-
- getSink().add(next, null);
- rate.increment();
- next = null;
- }
- }
+
+ public void stop() throws Exception {
+ dispatchContext.close(false);
+ super.stop();
+ }
+
+ public void onFlowUnblocked(ISinkController<Message> controller) {
+ dispatchContext.requestDispatch();
+ }
+
+ public boolean dispatch() {
+ while (true) {
+
+ if (next == null) {
+ int priority = this.priority;
+ if (priorityMod > 0) {
+ priority = counter % priorityMod == 0 ? 0 : priority;
+ }
+
+ next = new Message(messageIdGenerator.getAndIncrement(), producerId, createPayload(), null, destination, priority);
+ if (property != null) {
+ next.setProperty(property);
+ }
+ }
+
+ // If flow controlled stop until flow control is lifted.
+ if (outboundController.isSinkBlocked()) {
+ if (outboundController.addUnblockListener(this)) {
+ return true;
+ }
+ }
+
+ getSink().add(next, null);
+ rate.increment();
+ next = null;
+ return false;
+ }
+ }
private String createPayload() {
- if( payloadSize>=0 ) {
+ if (payloadSize >= 0) {
StringBuilder sb = new StringBuilder(payloadSize);
sb.append(name);
sb.append(':');
sb.append(++counter);
sb.append(':');
int length = sb.length();
- if( length <= payloadSize ) {
- sb.append(filler.subSequence(0, payloadSize-length));
+ if (length <= payloadSize) {
+ sb.append(filler.subSequence(0, payloadSize - length));
return sb.toString();
} else {
- return sb.substring(0, payloadSize);
+ return sb.substring(0, payloadSize);
}
} else {
- return name+":"+(++counter);
+ return name + ":" + (++counter);
}
}
-
- public void setName(String name) {
- this.name = name;
- }
public AtomicLong getMessageIdGenerator() {
return messageIdGenerator;
@@ -195,5 +171,9 @@
public void setPayloadSize(int messageSize) {
this.payloadSize = messageSize;
}
-}
+ @Override
+ protected void messageReceived(ISourceController<Message> controller, Message elem) {
+ controller.elementDispatched(elem);
+ }
+}