You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/02/16 16:32:53 UTC
svn commit: r744939 [2/2] - in /activemq/sandbox/activemq-flow/src:
main/java/org/apache/activemq/flow/ main/java/org/apache/activemq/queue/
main/java/org/apache/activemq/transport/
main/java/org/apache/activemq/transport/nio/ main/java/org/apache/acti...
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java?rev=744939&r1=744938&r2=744939&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java Mon Feb 16 15:32:50 2009
@@ -9,6 +9,11 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
+import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
+import org.apache.activemq.flow.Pipe.ReadReadyListener;
+import org.apache.activemq.transport.DispatchableTransport;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
@@ -20,56 +25,97 @@
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
public class PipeTransportFactory extends TransportFactory {
-
- private final HashMap<String,PipeTransportServer> servers = new HashMap<String, PipeTransportServer>();
+
+ private final HashMap<String, PipeTransportServer> servers = new HashMap<String, PipeTransportServer>();
static final AtomicInteger connectionCounter = new AtomicInteger();
-
- private static class PipeTransport implements Transport, Runnable {
+
+ private static class PipeTransport implements DispatchableTransport, Dispatchable, Runnable, ReadReadyListener<Object> {
private final Pipe<Object> pipe;
private TransportListener listener;
private String remoteAddress;
private AtomicBoolean stopping = new AtomicBoolean();
private Thread thread;
+ private DispatchContext readContext;
+ private String name;
public PipeTransport(Pipe<Object> pipe) {
this.pipe = pipe;
}
public void start() throws Exception {
- thread = new Thread(this, getRemoteAddress());
- thread.start();
+ if (readContext != null) {
+ pipe.setMode(Pipe.ASYNC);
+ readContext.requestDispatch();
+ } else {
+ thread = new Thread(this, getRemoteAddress());
+ thread.start();
+ }
}
public void stop() throws Exception {
- stopping.set(true);
- thread.join();
+ if (readContext != null) {
+ readContext.close();
+ } else {
+ stopping.set(true);
+ thread.join();
+ }
+ }
+
+ public void setDispatcher(IDispatcher dispatcher) {
+ readContext = dispatcher.register(this, name);
}
-
+
+ public void onReadReady(Pipe<Object> pipe) {
+ if (readContext != null) {
+ readContext.requestDispatch();
+ }
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
public void oneway(Object command) throws IOException {
+
try {
- while( !stopping.get() ) {
- if( pipe.offer(command, 500, TimeUnit.MILLISECONDS) ) {
- break;
- }
- }
+ pipe.write(command);
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
+ /*
+ * try { while( !stopping.get() ) { if( pipe.offer(command, 500,
+ * TimeUnit.MILLISECONDS) ) { break; } } } catch
+ * (InterruptedException e) { throw new InterruptedIOException(); }
+ */
+ }
+
+ public boolean dispatch() {
+ while (true) {
+
+ Object o = pipe.poll();
+ if (o == null) {
+ pipe.setReadReadyListener(this);
+ return true;
+ } else {
+ listener.onCommand(o);
+ }
+ }
}
public void run() {
+
try {
- while( !stopping.get() ) {
+ while (!stopping.get()) {
Object value = pipe.poll(500, TimeUnit.MILLISECONDS);
- if( value!=null ) {
+ if (value != null) {
listener.onCommand(value);
}
}
} catch (InterruptedException e) {
}
}
-
+
public String getRemoteAddress() {
return remoteAddress;
}
@@ -100,12 +146,11 @@
public void reconnect(URI uri) throws IOException {
throw new UnsupportedOperationException();
}
-
+
public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
throw new UnsupportedOperationException();
}
-
public Object request(Object command) throws IOException {
throw new UnsupportedOperationException();
}
@@ -120,10 +165,12 @@
public void setRemoteAddress(String remoteAddress) {
this.remoteAddress = remoteAddress;
+ if (name == null) {
+ name = remoteAddress;
+ }
}
-
}
-
+
private class PipeTransportServer implements TransportServer {
private URI connectURI;
private TransportAcceptListener listener;
@@ -165,9 +212,9 @@
public Transport connect() {
int connectionId = connectionCounter.incrementAndGet();
- String remoteAddress = connectURI.toString()+"#"+connectionId;
- assert listener!= null: "Server does not have an accept listener";
- Pipe<Object> pipe = new Pipe<Object>(10);
+ String remoteAddress = connectURI.toString() + "#" + connectionId;
+ assert listener != null : "Server does not have an accept listener";
+ Pipe<Object> pipe = new Pipe<Object>();
PipeTransport rc = new PipeTransport(pipe);
rc.setRemoteAddress(remoteAddress);
PipeTransport serverSide = new PipeTransport(pipe.connect());
@@ -176,12 +223,12 @@
return rc;
}
}
-
+
@Override
public synchronized TransportServer doBind(URI uri) throws IOException {
String node = uri.getHost();
- if( servers.containsKey(node) ) {
- throw new IOException("Server allready bound: "+node);
+ if (servers.containsKey(node)) {
+ throw new IOException("Server allready bound: " + node);
}
PipeTransportServer server = new PipeTransportServer();
server.setConnectURI(uri);
@@ -189,7 +236,7 @@
servers.put(node, server);
return server;
}
-
+
private synchronized void unbind(PipeTransportServer server) {
servers.remove(server.getName());
}
@@ -197,9 +244,9 @@
@Override
public synchronized Transport doCompositeConnect(URI location) throws Exception {
String name = location.getHost();
- PipeTransportServer server = servers.get(name );
- if( server==null ) {
- throw new IOException("Server is not bound: "+name);
+ PipeTransportServer server = servers.get(name);
+ if (server == null) {
+ throw new IOException("Server is not bound: " + name);
}
return server.connect();
}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java?rev=744939&r1=744938&r2=744939&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java Mon Feb 16 15:32:50 2009
@@ -5,16 +5,26 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.flow.Commands.FlowControl;
import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.StatefulWireFormat;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
public class ProtoWireFormatFactory implements WireFormatFactory {
- static public class TestWireFormat implements WireFormat {
-
+ public class TestWireFormat implements StatefulWireFormat {
+ private ByteBuffer currentOut;
+ private byte outType;
+
+ private ByteBuffer currentIn;
+ private byte inType;
+
public void marshal(Object value, DataOutput out) throws IOException {
if( value.getClass() == Message.class ) {
out.writeByte(0);
@@ -25,6 +35,9 @@
} else if( value.getClass() == Destination.class ) {
out.writeByte(2);
((Destination)value).writeFramed((OutputStream)out);
+ }else if( value.getClass() == FlowControl.class ) {
+ out.writeByte(3);
+ ((FlowControl)value).writeFramed((OutputStream)out);
} else {
throw new IOException("Unsupported type: "+value.getClass());
}
@@ -43,11 +56,160 @@
Destination d = new Destination();
d.mergeFramed((InputStream)in);
return d;
+ case 3:
+ FlowControl fc = new FlowControl();
+ fc.mergeFramed((InputStream)in);
+ return fc;
default:
throw new IOException("Unknonw type byte: ");
}
}
+ public boolean marshal(Object value, ByteBuffer target) throws IOException
+ {
+ if(currentOut == null)
+ {
+ //Ensure room for type byte and length byte:
+ if(target.remaining() < 5)
+ {
+ return false;
+ }
+
+ if( value.getClass() == Message.class ) {
+
+ currentOut = ByteBuffer.wrap(((Message)value).getProto().toFramedByteArray());
+ outType = 0;
+ } else if( value.getClass() == String.class ) {
+ outType = 1;
+ try {
+ currentOut = ByteBuffer.wrap(((String)value).getBytes("utf-8"));
+ } catch (UnsupportedEncodingException e) {
+ //Shouldn't happen.
+ throw IOExceptionSupport.create(e);
+ }
+ } else if( value.getClass() == Destination.class ) {
+ outType = 2;
+ currentOut = ByteBuffer.wrap(((Destination)value).toFramedByteArray());
+ }else if( value.getClass() == FlowControl.class ) {
+ outType = 3;
+ currentOut = ByteBuffer.wrap(((FlowControl)value).toFramedByteArray());
+ }else {
+ throw new IOException("Unsupported type: "+value.getClass());
+ }
+
+ //Write type:
+ target.put(outType);
+ //Write length:
+ target.putInt(currentOut.remaining());
+ if(currentOut.remaining() > 1024*1024)
+ {
+ throw new IOException("Packet exceeded max memory size!");
+ }
+ }
+
+ //Avoid overflow:
+ if(currentOut.remaining() > target.remaining())
+ {
+ int limit = currentOut.limit();
+ currentOut.limit(currentOut.position() + target.remaining());
+ target.put(currentOut);
+ currentOut.limit(limit);
+ }
+ else
+ {
+ target.put(currentOut);
+ }
+
+ if(!currentOut.hasRemaining())
+ {
+ currentOut = null;
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Unmarshals an object. When the object is read it is returned.
+ * @param source
+ * @return The object when unmarshalled, null otherwise
+ */
+ public Object unMarshal(ByteBuffer source) throws IOException
+ {
+ if(currentIn == null)
+ {
+ if(source.remaining() < 5)
+ {
+ return null;
+ }
+
+ inType = source.get();
+ int length = source.getInt();
+ if(length > 1024*1024)
+ {
+ throw new IOException("Packet exceeded max memory size!");
+ }
+ currentIn = ByteBuffer.wrap(new byte[length]);
+
+ }
+
+ if(!source.hasRemaining())
+ {
+ return null;
+ }
+
+ if(source.remaining() > currentIn.remaining())
+ {
+ int limit = source.limit();
+ source.limit(source.position() + currentIn.remaining());
+ currentIn.put(source);
+ source.limit(limit);
+ }
+ else
+ {
+ currentIn.put(source);
+ }
+
+ //If we haven't finished the packet return to get more data:
+ if(currentIn.hasRemaining())
+ {
+ return null;
+ }
+
+ Object ret = null;
+ switch(inType) {
+ case 0:
+ Commands.Message m = new Commands.Message();
+ try
+ {
+ m.mergeFramed(currentIn.array());
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ ret = new Message(m);
+ break;
+ case 1:
+ ret = new String(currentIn.array(), "utf-8");
+ break;
+ case 2:
+ Destination d = new Destination();
+ d.mergeFramed(currentIn.array());
+ ret = d;
+ break;
+ case 3:
+ FlowControl c = new FlowControl();
+ c.mergeFramed(currentIn.array());
+ ret = c;
+ break;
+ default:
+ throw new IOException("Unknown type byte: " + inType);
+ }
+
+ currentIn = null;
+ return ret;
+ }
+
public int getVersion() {
return 0;
}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java?rev=744939&r1=744938&r2=744939&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java Mon Feb 16 15:32:50 2009
@@ -7,36 +7,42 @@
import org.apache.activemq.dispatch.IDispatcher;
import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.flow.Commands.FlowControl;
import org.apache.activemq.flow.ISinkController.FlowControllable;
import org.apache.activemq.flow.MockBroker.DeliveryTarget;
+import org.apache.activemq.queue.SingleFlowRelay;
+import org.apache.activemq.transport.DispatchableTransport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
public class RemoteConnection implements TransportListener, DeliveryTarget {
-
protected Transport transport;
protected MockBroker broker;
protected final Object inboundMutex = new Object();
- protected FlowController<Message> inboundController;
+ protected IFlowController<Message> inboundController;
+
+ protected SingleFlowRelay<Message> outputQueue;
+ protected IFlowController<Message> outboundController;
+ protected ProtocolLimiter<Message> outboundLimiter;
+ protected Flow ouboundFlow;
- protected final Object outboundMutex = new Object();
- protected IFlowSink<Message> outboundController;
protected String name;
private int priorityLevels;
private final int outputWindowSize = 1000;
- private final int outputResumeThreshold = 500;
+ private final int outputResumeThreshold = 900;
private final int inputWindowSize = 1000;
private final int inputResumeThreshold = 900;
private IDispatcher dispatcher;
- private ExecutorService writer;
-
private final AtomicBoolean stopping = new AtomicBoolean();
+ protected Flow outputFlow;
+ protected boolean blockingTransport = false;
+ ExecutorService blockingWriter;
public void setBroker(MockBroker broker) {
this.broker = broker;
@@ -53,51 +59,58 @@
public void stop() throws Exception {
stopping.set(true);
- writer.shutdown();
if (transport != null) {
transport.stop();
}
+ if (blockingWriter != null) {
+ blockingWriter.shutdown();
+ }
}
public void onCommand(Object command) {
try {
+ // System.out.println("Got Command: " + command);
// First command in should be the name of the connection
- if( name==null ) {
+ if (name == null) {
name = (String) command;
initialize();
} else if (command.getClass() == Message.class) {
Message msg = (Message) command;
- // Use the flow controller to send the message on so that we do
- // not overflow
- // the broker.
- while (!inboundController.offer(msg, null)) {
- inboundController.waitForFlowUnblock();
- }
+ inboundController.add(msg, null);
} else if (command.getClass() == Destination.class) {
// This is a subscription request
Destination destination = (Destination) command;
+
broker.subscribe(destination, this);
+ } else if (command.getClass() == FlowControl.class) {
+ // This is a subscription request
+ FlowControl fc = (FlowControl) command;
+ synchronized (outputQueue) {
+ outboundLimiter.onProtocolMessage(fc);
+ }
+ } else {
+ onException(new Exception("Unrecognized command: " + command));
}
} catch (Exception e) {
onException(e);
}
}
- private void initialize() {
+ protected void initialize() {
// Setup the input processing..
- SizeLimiter<Message> limiter = new SizeLimiter<Message>(inputWindowSize, inputResumeThreshold);
- Flow flow = new Flow(name + "-inbound", false);
+ Flow flow = new Flow(name, false);
+ WindowLimiter<Message> limiter = new WindowLimiter<Message>(false, flow, inputWindowSize, inputResumeThreshold);
+
inboundController = new FlowController<Message>(new FlowControllable<Message>() {
public void flowElemAccepted(ISourceController<Message> controller, Message elem) {
- broker.router.route(controller, elem);
- inboundController.elementDispatched(elem);
+ messageReceived(controller, elem);
}
@Override
public String toString() {
return name;
}
-
+
public IFlowSink<Message> getFlowSink() {
return null;
}
@@ -107,16 +120,69 @@
}
}, flow, limiter, inboundMutex);
- // Setup output processing
- writer = Executors.newSingleThreadExecutor();
- FlowControllable<Message> controllable = new FlowControllable<Message>(){
- public void flowElemAccepted(final ISourceController<Message> controller, final Message elem) {
- writer.execute(new Runnable() {
+ ouboundFlow = new Flow(name, false);
+ outboundLimiter = new WindowLimiter<Message>(true, ouboundFlow, outputWindowSize, outputResumeThreshold);
+ outputQueue = new SingleFlowRelay<Message>(ouboundFlow, name + "-outbound", outboundLimiter);
+ outboundController = outputQueue.getFlowController(ouboundFlow);
+
+ if (transport instanceof DispatchableTransport) {
+ outputQueue.setDrain(new IFlowDrain<Message>() {
+
+ public void drain(Message message, ISourceController<Message> controller) {
+ write(message);
+ }
+ });
+
+ } else {
+ blockingTransport = true;
+ blockingWriter = Executors.newSingleThreadExecutor();
+ outputQueue.setDrain(new IFlowDrain<Message>() {
+ public void drain(final Message message, ISourceController<Message> controller) {
+ write(message);
+ };
+ });
+ /*
+ * // Setup output processing final Executor writer =
+ * Executors.newSingleThreadExecutor(); FlowControllable<Message>
+ * controllable = new FlowControllable<Message>() { public void
+ * flowElemAccepted( final ISourceController<Message> controller,
+ * final Message elem) { writer.execute(new Runnable() { public void
+ * run() { if (!stopping.get()) { try { transport.oneway(elem);
+ * controller.elementDispatched(elem); } catch (IOException e) {
+ * onException(e); } } } }); }
+ *
+ * public IFlowSink<Message> getFlowSink() { return null; }
+ *
+ * public IFlowSource<Message> getFlowSource() { return null; } };
+ *
+ * if (priorityLevels <= 1) { outboundController = new
+ * FlowController<Message>(controllable, flow, limiter,
+ * outboundMutex); } else { PrioritySizeLimiter<Message> pl = new
+ * PrioritySizeLimiter<Message>( outputWindowSize,
+ * outputResumeThreshold, priorityLevels);
+ * pl.setPriorityMapper(Message.PRIORITY_MAPPER); outboundController
+ * = new PriorityFlowController<Message>( controllable, flow, pl,
+ * outboundMutex); }
+ */
+ }
+ // outputQueue.setDispatcher(dispatcher);
+
+ }
+
+ private final void write(final Object o) {
+ synchronized (outputQueue) {
+ if (!blockingTransport) {
+ try {
+ transport.oneway(o);
+ } catch (IOException e) {
+ onException(e);
+ }
+ } else {
+ blockingWriter.execute(new Runnable() {
public void run() {
if (!stopping.get()) {
try {
- transport.oneway(elem);
- controller.elementDispatched(elem);
+ transport.oneway(o);
} catch (IOException e) {
onException(e);
}
@@ -124,33 +190,21 @@
}
});
}
- public IFlowSink<Message> getFlowSink() {
- return null;
- }
- public IFlowSource<Message> getFlowSource() {
- return null;
- }
- };
-
- flow = new Flow(name + "-outbound", false);
- if (priorityLevels <= 1) {
- limiter = new SizeLimiter<Message>(outputWindowSize, outputResumeThreshold);
- outboundController = new FlowController<Message>(controllable, flow, limiter, outboundMutex);
- } else {
- PrioritySizeLimiter<Message> pl = new PrioritySizeLimiter<Message>(outputWindowSize, outputResumeThreshold, priorityLevels);
- pl.setPriorityMapper(Message.PRIORITY_MAPPER);
- outboundController = new PriorityFlowController<Message>(controllable, flow, pl, outboundMutex);
}
+ }
+ protected void messageReceived(ISourceController<Message> controller, Message elem) {
+ broker.router.route(controller, elem);
+ inboundController.elementDispatched(elem);
}
public void onException(IOException error) {
- onException((Exception)error);
+ onException((Exception) error);
}
public void onException(Exception error) {
if (!stopping.get() && !broker.isStopping()) {
- System.out.println("RemoteConnection error: "+error);
+ System.out.println("RemoteConnection error: " + error);
error.printStackTrace();
}
}
@@ -179,6 +233,13 @@
public void setDispatcher(IDispatcher dispatcher) {
this.dispatcher = dispatcher;
+ if (transport instanceof DispatchableTransport) {
+ DispatchableTransport dt = ((DispatchableTransport) transport);
+ if (name != null) {
+ dt.setName(name);
+ }
+ dt.setDispatcher(getDispatcher());
+ }
}
public MockBroker getBroker() {
@@ -202,11 +263,66 @@
}
public IFlowSink<Message> getSink() {
- return outboundController;
+ return outputQueue;
}
public boolean match(Message message) {
return true;
}
+ private interface ProtocolLimiter<E> extends IFlowLimiter<E> {
+ public void onProtocolMessage(FlowControl m);
+ }
+
+ private class WindowLimiter<E> extends SizeLimiter<E> implements ProtocolLimiter<E> {
+ final Flow flow;
+ final boolean clientMode;
+ private int available;
+
+ public WindowLimiter(boolean clientMode, Flow flow, int capacity, int resumeThreshold) {
+ super(capacity, resumeThreshold);
+ this.clientMode = clientMode;
+ this.flow = flow;
+ }
+
+ public void reserve(E elem) {
+ super.reserve(elem);
+ if (!clientMode) {
+ // System.out.println(RemoteConnection.this.name + " Reserved "
+ // + this);
+ }
+ }
+
+ public void releaseReserved(E elem) {
+ super.reserve(elem);
+ if (!clientMode) {
+ // System.out.println(RemoteConnection.this.name +
+ // " Released Reserved " + this);
+ }
+ }
+
+ protected void remove(int size) {
+ super.remove(size);
+ if (!clientMode) {
+ available += size;
+ if (available >= capacity - resumeThreshold) {
+ FlowControl fc = new FlowControl();
+ fc.setCredit(available);
+ write(fc);
+ // System.out.println(RemoteConnection.this.name +
+ // " Send Release " + available + this);
+ available = 0;
+ }
+ }
+ }
+
+ public void onProtocolMessage(FlowControl m) {
+ remove(m.getCredit());
+ }
+
+ public int getElementSize(Message m) {
+ return m.getFlowLimiterSize();
+ }
+ }
+
}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java?rev=744939&r1=744938&r2=744939&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java Mon Feb 16 15:32:50 2009
@@ -2,23 +2,20 @@
import java.io.IOException;
import java.net.URI;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.flow.Commands.Destination;
import org.apache.activemq.metric.MetricAggregator;
import org.apache.activemq.metric.MetricCounter;
+import org.apache.activemq.transport.DispatchableTransport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
-import org.apache.activemq.transport.TransportListener;
-public class RemoteConsumer implements TransportListener {
+public class RemoteConsumer extends RemoteConnection{
- private final AtomicBoolean stopping = new AtomicBoolean();
private final MetricCounter consumerRate = new MetricCounter();
- private Transport transport;
- private MockBroker broker;
- private String name;
private MetricAggregator totalConsumerRate;
private long thinkTime;
private Destination destination;
@@ -30,6 +27,12 @@
URI uri = broker.getConnectURI();
transport = TransportFactory.compositeConnect(uri);
+ if(transport instanceof DispatchableTransport)
+ {
+ DispatchableTransport dt = ((DispatchableTransport)transport);
+ dt.setName(name);
+ dt.setDispatcher(getDispatcher());
+ }
transport.setTransportListener(this);
transport.start();
@@ -37,58 +40,31 @@
transport.oneway(name);
// Sending the destination acts as the subscribe.
transport.oneway(destination);
+ super.initialize();
}
- public void stop() throws Exception {
- stopping.set(true);
- if( transport!=null ) {
- transport.stop();
- transport=null;
- }
- }
-
- public void onCommand(Object command) {
- if( command.getClass() == Message.class ) {
-
- if (thinkTime > 0) {
- try {
- Thread.sleep(thinkTime);
- } catch (InterruptedException e) {
+ protected void messageReceived(final ISourceController<Message> controller, final Message elem) {
+ if (thinkTime > 0) {
+ getDispatcher().schedule(new Runnable(){
+
+ public void run() {
+ consumerRate.increment();
+ controller.elementDispatched(elem);
}
- }
- consumerRate.increment();
+
+ }, thinkTime, TimeUnit.MILLISECONDS);
- } else {
- System.out.println("Unhandled command: "+command);
- }
- }
-
- public void onException(IOException error) {
- if( !stopping.get() ) {
- System.out.println("RemoteConsumer error: "+error);
- error.printStackTrace();
}
- }
-
- public void transportInterupted() {
- }
- public void transportResumed() {
+ else
+ {
+ consumerRate.increment();
+ controller.elementDispatched(elem);
+ }
}
public void setName(String name) {
this.name = name;
}
- public void setBroker(MockBroker broker) {
- this.broker = broker;
- }
-
- public MockBroker getBroker() {
- return broker;
- }
-
- public String getName() {
- return name;
- }
public MetricAggregator getTotalConsumerRate() {
return totalConsumerRate;
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java?rev=744939&r1=744938&r2=744939&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java Mon Feb 16 15:32:50 2009
@@ -5,22 +5,21 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
+import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.flow.ISinkController.FlowUnblockListener;
import org.apache.activemq.metric.MetricAggregator;
import org.apache.activemq.metric.MetricCounter;
+import org.apache.activemq.transport.DispatchableTransport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener;
-public class RemoteProducer implements TransportListener, Runnable {
+public class RemoteProducer extends RemoteConnection implements Dispatchable, FlowUnblockListener<Message>{
- private final AtomicBoolean stopping = new AtomicBoolean();
private final MetricCounter rate = new MetricCounter();
- private Transport transport;
- private MockBroker broker;
- private String name;
- private Thread thread;
private AtomicLong messageIdGenerator;
private int priority;
private int priorityMod;
@@ -29,6 +28,8 @@
private Destination destination;
private String property;
private MetricAggregator totalProducerRate;
+ Message next;
+ private DispatchContext dispatchContext;
public void start() throws Exception {
rate.name("Producer " + name + " Rate");
@@ -37,68 +38,67 @@
URI uri = broker.getConnectURI();
transport = TransportFactory.compositeConnect(uri);
transport.setTransportListener(this);
+ if(transport instanceof DispatchableTransport)
+ {
+ DispatchableTransport dt = ((DispatchableTransport)transport);
+ dt.setName(name);
+ dt.setDispatcher(getDispatcher());
+ }
+ super.setTransport(transport);
+
+ super.initialize();
transport.start();
-
// Let the remote side know our name.
transport.oneway(name);
-
- thread = new Thread(this, name);
- thread.start();
+ dispatchContext = getDispatcher().register(this, name + "-producer");
+ dispatchContext.requestDispatch();
}
- public void stop() throws Exception {
- stopping.set(true);
- if( transport!=null ) {
- transport.stop();
- }
- thread.join();
- transport=null;
- }
-
- public void run() {
- try {
- while( !stopping.get() ) {
-
- int priority = this.priority;
- if (priorityMod > 0) {
- priority = counter % priorityMod == 0 ? 0 : priority;
- }
-
- Message next = new Message(messageIdGenerator.getAndIncrement(), producerId, name + ++counter, null, destination, priority);
- if (property != null) {
- next.setProperty(property);
- }
-
- transport.oneway(next);
- rate.increment();
- }
- } catch (IOException e) {
- onException(e);
- }
- }
-
- public void onCommand(Object command) {
- System.out.println("Unhandled command: "+command);
- }
-
- public void onException(IOException error) {
- if( !stopping.get() ) {
- System.out.println("RemoteProducer error: "+error);
- error.printStackTrace();
- }
- }
-
- public void transportInterupted() {
- }
- public void transportResumed() {
- }
-
- public void setName(String name) {
+ public void stop() throws Exception
+ {
+ dispatchContext.close();
+ super.stop();
+ }
+
+ public void onFlowUnblocked(ISinkController<Message> controller) {
+ dispatchContext.requestDispatch();
+ }
+
+ public boolean dispatch() {
+ while(true)
+ {
+
+ if(next == null)
+ {
+ int priority = this.priority;
+ if (priorityMod > 0) {
+ priority = counter % priorityMod == 0 ? 0 : priority;
+ }
+
+ next = new Message(messageIdGenerator.getAndIncrement(), producerId, name + ++counter, null, destination, priority);
+ if (property != null) {
+ next.setProperty(property);
+ }
+ }
+
+ //If flow controlled stop until flow control is lifted.
+ if(outboundController.isSinkBlocked())
+ {
+ if(outboundController.addUnblockListener(this))
+ {
+ return true;
+ }
+ }
+
+ getSink().add(next, null);
+ rate.increment();
+ next = null;
+ }
+ }
+
+ public void setName(String name) {
this.name = name;
}
- public void setBroker(MockBroker broker) {
- this.broker = broker;
- }
public AtomicLong getMessageIdGenerator() {
return messageIdGenerator;
@@ -148,14 +148,6 @@
this.property = property;
}
- public MockBroker getBroker() {
- return broker;
- }
-
- public String getName() {
- return name;
- }
-
public MetricAggregator getTotalProducerRate() {
return totalProducerRate;
}
@@ -166,4 +158,6 @@
public MetricCounter getRate() {
return rate;
- }}
+ }
+}
+