You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/02/16 16:32:53 UTC

svn commit: r744939 [2/2] - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/flow/ main/java/org/apache/activemq/queue/ main/java/org/apache/activemq/transport/ main/java/org/apache/activemq/transport/nio/ main/java/org/apache/acti...

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java?rev=744939&r1=744938&r2=744939&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java Mon Feb 16 15:32:50 2009
@@ -9,6 +9,11 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
+import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
+import org.apache.activemq.flow.Pipe.ReadReadyListener;
+import org.apache.activemq.transport.DispatchableTransport;
 import org.apache.activemq.transport.FutureResponse;
 import org.apache.activemq.transport.ResponseCallback;
 import org.apache.activemq.transport.Transport;
@@ -20,56 +25,97 @@
 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
 
 public class PipeTransportFactory extends TransportFactory {
-    
-    private final HashMap<String,PipeTransportServer> servers = new HashMap<String, PipeTransportServer>();
+
+    private final HashMap<String, PipeTransportServer> servers = new HashMap<String, PipeTransportServer>();
     static final AtomicInteger connectionCounter = new AtomicInteger();
-    
-    private static class PipeTransport implements Transport, Runnable {
+
+    private static class PipeTransport implements DispatchableTransport, Dispatchable, Runnable, ReadReadyListener<Object> {
 
         private final Pipe<Object> pipe;
         private TransportListener listener;
         private String remoteAddress;
         private AtomicBoolean stopping = new AtomicBoolean();
         private Thread thread;
+        private DispatchContext readContext;
+        private String name;
 
         public PipeTransport(Pipe<Object> pipe) {
             this.pipe = pipe;
         }
 
         public void start() throws Exception {
-            thread = new Thread(this, getRemoteAddress());
-            thread.start();
+            if (readContext != null) {
+                pipe.setMode(Pipe.ASYNC);
+                readContext.requestDispatch();
+            } else {
+                thread = new Thread(this, getRemoteAddress());
+                thread.start();
+            }
         }
 
         public void stop() throws Exception {
-            stopping.set(true);
-            thread.join();
+            if (readContext != null) {
+                readContext.close();
+            } else {
+                stopping.set(true);
+                thread.join();
+            }
+        }
+
+        public void setDispatcher(IDispatcher dispatcher) {
+            readContext = dispatcher.register(this, name);
         }
-        
+
+        public void onReadReady(Pipe<Object> pipe) {
+            if (readContext != null) {
+                readContext.requestDispatch();
+            }
+        }
+
+        public void setName(String name) {
+            this.name = name;
+        }
+
         public void oneway(Object command) throws IOException {
+
             try {
-                while( !stopping.get() ) {
-                    if( pipe.offer(command, 500, TimeUnit.MILLISECONDS) ) {
-                        break;
-                    }
-                }
+                pipe.write(command);
             } catch (InterruptedException e) {
                 throw new InterruptedIOException();
             }
+            /*
+             * try { while( !stopping.get() ) { if( pipe.offer(command, 500,
+             * TimeUnit.MILLISECONDS) ) { break; } } } catch
+             * (InterruptedException e) { throw new InterruptedIOException(); }
+             */
+        }
+
+        public boolean dispatch() {
+            while (true) {
+
+                Object o = pipe.poll();
+                if (o == null) {
+                    pipe.setReadReadyListener(this);
+                    return true;
+                } else {
+                    listener.onCommand(o);
+                }
+            }
         }
 
         public void run() {
+
             try {
-                while( !stopping.get() ) {
+                while (!stopping.get()) {
                     Object value = pipe.poll(500, TimeUnit.MILLISECONDS);
-                    if( value!=null ) {
+                    if (value != null) {
                         listener.onCommand(value);
                     }
                 }
             } catch (InterruptedException e) {
             }
         }
-        
+
         public String getRemoteAddress() {
             return remoteAddress;
         }
@@ -100,12 +146,11 @@
         public void reconnect(URI uri) throws IOException {
             throw new UnsupportedOperationException();
         }
-        
+
         public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
             throw new UnsupportedOperationException();
         }
 
-
         public Object request(Object command) throws IOException {
             throw new UnsupportedOperationException();
         }
@@ -120,10 +165,12 @@
 
         public void setRemoteAddress(String remoteAddress) {
             this.remoteAddress = remoteAddress;
+            if (name == null) {
+                name = remoteAddress;
+            }
         }
-
     }
-    
+
     private class PipeTransportServer implements TransportServer {
         private URI connectURI;
         private TransportAcceptListener listener;
@@ -165,9 +212,9 @@
 
         public Transport connect() {
             int connectionId = connectionCounter.incrementAndGet();
-            String remoteAddress = connectURI.toString()+"#"+connectionId;
-            assert listener!= null: "Server does not have an accept listener";
-            Pipe<Object> pipe = new Pipe<Object>(10);
+            String remoteAddress = connectURI.toString() + "#" + connectionId;
+            assert listener != null : "Server does not have an accept listener";
+            Pipe<Object> pipe = new Pipe<Object>();
             PipeTransport rc = new PipeTransport(pipe);
             rc.setRemoteAddress(remoteAddress);
             PipeTransport serverSide = new PipeTransport(pipe.connect());
@@ -176,12 +223,12 @@
             return rc;
         }
     }
-    
+
     @Override
     public synchronized TransportServer doBind(URI uri) throws IOException {
         String node = uri.getHost();
-        if( servers.containsKey(node) ) {
-            throw new IOException("Server allready bound: "+node);
+        if (servers.containsKey(node)) {
+            throw new IOException("Server allready bound: " + node);
         }
         PipeTransportServer server = new PipeTransportServer();
         server.setConnectURI(uri);
@@ -189,7 +236,7 @@
         servers.put(node, server);
         return server;
     }
-    
+
     private synchronized void unbind(PipeTransportServer server) {
         servers.remove(server.getName());
     }
@@ -197,9 +244,9 @@
     @Override
     public synchronized Transport doCompositeConnect(URI location) throws Exception {
         String name = location.getHost();
-        PipeTransportServer server = servers.get(name );
-        if( server==null ) {
-            throw new IOException("Server is not bound: "+name);
+        PipeTransportServer server = servers.get(name);
+        if (server == null) {
+            throw new IOException("Server is not bound: " + name);
         }
         return server.connect();
     }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java?rev=744939&r1=744938&r2=744939&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java Mon Feb 16 15:32:50 2009
@@ -5,16 +5,26 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
 
 import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.flow.Commands.FlowControl;
 import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.StatefulWireFormat;
 import org.apache.activemq.wireformat.WireFormat;
 import org.apache.activemq.wireformat.WireFormatFactory;
 
 public class ProtoWireFormatFactory implements WireFormatFactory {
 
-    static public class TestWireFormat implements WireFormat {
-
+    public class TestWireFormat implements StatefulWireFormat {
+        private ByteBuffer currentOut;
+        private byte outType;
+        
+        private ByteBuffer currentIn;
+        private byte inType;
+        
         public void marshal(Object value, DataOutput out) throws IOException {
             if( value.getClass() == Message.class ) {
                 out.writeByte(0);
@@ -25,6 +35,9 @@
             } else if( value.getClass() == Destination.class ) {
                 out.writeByte(2);
                 ((Destination)value).writeFramed((OutputStream)out);
+            }else if( value.getClass() == FlowControl.class ) {
+                out.writeByte(3);
+                ((FlowControl)value).writeFramed((OutputStream)out);
             } else {
                 throw new IOException("Unsupported type: "+value.getClass());
             }
@@ -43,11 +56,160 @@
                     Destination d = new Destination();
                     d.mergeFramed((InputStream)in);
                     return d;
+                case 3:
+                    FlowControl fc = new FlowControl();
+                    fc.mergeFramed((InputStream)in);
+                    return fc;
                 default:
                     throw new IOException("Unknonw type byte: ");
             }
         }
 
+        public boolean marshal(Object value, ByteBuffer target) throws IOException
+        {
+            if(currentOut == null)
+            {
+                //Ensure room for type byte and length byte:
+                if(target.remaining() < 5)
+                {
+                    return false;
+                }
+                
+                if( value.getClass() == Message.class ) {
+                	
+                	currentOut = ByteBuffer.wrap(((Message)value).getProto().toFramedByteArray());
+                	outType = 0;
+                } else if( value.getClass() == String.class ) {
+                	outType = 1;
+                    try {
+                        currentOut = ByteBuffer.wrap(((String)value).getBytes("utf-8"));
+                    } catch (UnsupportedEncodingException e) {
+                        //Shouldn't happen.
+                        throw IOExceptionSupport.create(e);
+                    }
+                } else if( value.getClass() == Destination.class ) {
+                	outType = 2;
+                    currentOut = ByteBuffer.wrap(((Destination)value).toFramedByteArray());
+                }else if( value.getClass() == FlowControl.class ) {
+                	outType = 3;
+                    currentOut = ByteBuffer.wrap(((FlowControl)value).toFramedByteArray());
+                }else {
+                    throw new IOException("Unsupported type: "+value.getClass());
+                }
+                
+                //Write type:
+                target.put(outType);
+                //Write length:
+                target.putInt(currentOut.remaining());
+                if(currentOut.remaining() > 1024*1024)
+                {
+                    throw new IOException("Packet exceeded max memory size!");
+                }
+            }
+            
+            //Avoid overflow:
+            if(currentOut.remaining() > target.remaining())
+            {
+                int limit = currentOut.limit();
+                currentOut.limit(currentOut.position() + target.remaining());
+                target.put(currentOut);
+                currentOut.limit(limit);
+            }
+            else
+            {
+                target.put(currentOut);
+            }
+            
+            if(!currentOut.hasRemaining())
+            {
+                currentOut = null;
+                return true;
+            }
+            return false;
+        }
+  
+        /**
+         * Unmarshals an object. When the object is read it is returned.
+         * @param source
+         * @return The object when unmarshalled, null otherwise
+         */
+        public Object unMarshal(ByteBuffer source) throws IOException
+        {
+            if(currentIn == null)
+            {
+                if(source.remaining() < 5)
+                {
+                    return null;
+                }
+                
+                inType = source.get();
+                int length = source.getInt();
+                if(length > 1024*1024)
+                {
+                    throw new IOException("Packet exceeded max memory size!");
+                }
+                currentIn = ByteBuffer.wrap(new byte[length]);
+                
+            }
+            
+            if(!source.hasRemaining())
+            {
+            	return null;
+            }
+            
+            if(source.remaining() > currentIn.remaining())
+            {
+            	int limit = source.limit();
+            	source.limit(source.position() + currentIn.remaining());
+            	currentIn.put(source);
+            	source.limit(limit);
+            }
+            else
+            {
+            	currentIn.put(source);
+            }
+            
+            //If we haven't finished the packet return to get more data:
+            if(currentIn.hasRemaining())
+            {
+            	return null;
+            }
+            
+            Object ret = null;
+            switch(inType) {
+            case 0:
+            	Commands.Message m = new Commands.Message();
+            	try
+            	{
+            		m.mergeFramed(currentIn.array());
+            	}
+            	catch(Exception e)
+            	{
+            		e.printStackTrace();
+            	}
+            	ret = new Message(m);
+            	break;
+            case 1:
+            	ret = new String(currentIn.array(), "utf-8");
+            	break;
+        	case 2:
+        		Destination d = new Destination();
+        		d.mergeFramed(currentIn.array());
+        		ret = d;
+        		break;
+        	case 3:
+        		FlowControl c = new FlowControl();
+        		c.mergeFramed(currentIn.array());
+        		ret = c;
+        		break;
+        	default:
+        		throw new IOException("Unknown type byte: " + inType);
+            }
+            
+            currentIn = null;
+            return ret;
+        }
+        
         public int getVersion() {
             return 0;
         }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java?rev=744939&r1=744938&r2=744939&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java Mon Feb 16 15:32:50 2009
@@ -7,36 +7,42 @@
 
 import org.apache.activemq.dispatch.IDispatcher;
 import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.flow.Commands.FlowControl;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
 import org.apache.activemq.flow.MockBroker.DeliveryTarget;
+import org.apache.activemq.queue.SingleFlowRelay;
+import org.apache.activemq.transport.DispatchableTransport;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportListener;
 
 public class RemoteConnection implements TransportListener, DeliveryTarget {
 
-
     protected Transport transport;
     protected MockBroker broker;
 
     protected final Object inboundMutex = new Object();
-    protected FlowController<Message> inboundController;
+    protected IFlowController<Message> inboundController;
+
+    protected SingleFlowRelay<Message> outputQueue;
+    protected IFlowController<Message> outboundController;
+    protected ProtocolLimiter<Message> outboundLimiter;
+    protected Flow ouboundFlow;
 
-    protected final Object outboundMutex = new Object();
-    protected IFlowSink<Message> outboundController;
     protected String name;
 
     private int priorityLevels;
 
     private final int outputWindowSize = 1000;
-    private final int outputResumeThreshold = 500;
+    private final int outputResumeThreshold = 900;
 
     private final int inputWindowSize = 1000;
     private final int inputResumeThreshold = 900;
 
     private IDispatcher dispatcher;
-    private ExecutorService writer;
-
     private final AtomicBoolean stopping = new AtomicBoolean();
+    protected Flow outputFlow;
+    protected boolean blockingTransport = false;
+    ExecutorService blockingWriter;
 
     public void setBroker(MockBroker broker) {
         this.broker = broker;
@@ -53,51 +59,58 @@
 
     public void stop() throws Exception {
         stopping.set(true);
-        writer.shutdown();
         if (transport != null) {
             transport.stop();
         }
+        if (blockingWriter != null) {
+            blockingWriter.shutdown();
+        }
     }
 
     public void onCommand(Object command) {
         try {
+            // System.out.println("Got Command: " + command);
             // First command in should be the name of the connection
-            if( name==null ) {
+            if (name == null) {
                 name = (String) command;
                 initialize();
             } else if (command.getClass() == Message.class) {
                 Message msg = (Message) command;
-                // Use the flow controller to send the message on so that we do
-                // not overflow
-                // the broker.
-                while (!inboundController.offer(msg, null)) {
-                    inboundController.waitForFlowUnblock();
-                }
+                inboundController.add(msg, null);
             } else if (command.getClass() == Destination.class) {
                 // This is a subscription request
                 Destination destination = (Destination) command;
+
                 broker.subscribe(destination, this);
+            } else if (command.getClass() == FlowControl.class) {
+                // This is a subscription request
+                FlowControl fc = (FlowControl) command;
+                synchronized (outputQueue) {
+                    outboundLimiter.onProtocolMessage(fc);
+                }
+            } else {
+                onException(new Exception("Unrecognized command: " + command));
             }
         } catch (Exception e) {
             onException(e);
         }
     }
 
-    private void initialize() {
+    protected void initialize() {
         // Setup the input processing..
-        SizeLimiter<Message> limiter = new SizeLimiter<Message>(inputWindowSize, inputResumeThreshold);
-        Flow flow = new Flow(name + "-inbound", false);
+        Flow flow = new Flow(name, false);
+        WindowLimiter<Message> limiter = new WindowLimiter<Message>(false, flow, inputWindowSize, inputResumeThreshold);
+
         inboundController = new FlowController<Message>(new FlowControllable<Message>() {
             public void flowElemAccepted(ISourceController<Message> controller, Message elem) {
-                broker.router.route(controller, elem);
-                inboundController.elementDispatched(elem);
+                messageReceived(controller, elem);
             }
 
             @Override
             public String toString() {
                 return name;
             }
-            
+
             public IFlowSink<Message> getFlowSink() {
                 return null;
             }
@@ -107,16 +120,69 @@
             }
         }, flow, limiter, inboundMutex);
 
-        // Setup output processing
-        writer = Executors.newSingleThreadExecutor();
-        FlowControllable<Message> controllable = new FlowControllable<Message>(){
-            public void flowElemAccepted(final ISourceController<Message> controller, final Message elem) {
-                writer.execute(new Runnable() {
+        ouboundFlow = new Flow(name, false);
+        outboundLimiter = new WindowLimiter<Message>(true, ouboundFlow, outputWindowSize, outputResumeThreshold);
+        outputQueue = new SingleFlowRelay<Message>(ouboundFlow, name + "-outbound", outboundLimiter);
+        outboundController = outputQueue.getFlowController(ouboundFlow);
+
+        if (transport instanceof DispatchableTransport) {
+            outputQueue.setDrain(new IFlowDrain<Message>() {
+
+                public void drain(Message message, ISourceController<Message> controller) {
+                    write(message);
+                }
+            });
+
+        } else {
+            blockingTransport = true;
+            blockingWriter = Executors.newSingleThreadExecutor();
+            outputQueue.setDrain(new IFlowDrain<Message>() {
+                public void drain(final Message message, ISourceController<Message> controller) {
+                    write(message);
+                };
+            });
+            /*
+             * // Setup output processing final Executor writer =
+             * Executors.newSingleThreadExecutor(); FlowControllable<Message>
+             * controllable = new FlowControllable<Message>() { public void
+             * flowElemAccepted( final ISourceController<Message> controller,
+             * final Message elem) { writer.execute(new Runnable() { public void
+             * run() { if (!stopping.get()) { try { transport.oneway(elem);
+             * controller.elementDispatched(elem); } catch (IOException e) {
+             * onException(e); } } } }); }
+             * 
+             * public IFlowSink<Message> getFlowSink() { return null; }
+             * 
+             * public IFlowSource<Message> getFlowSource() { return null; } };
+             * 
+             * if (priorityLevels <= 1) { outboundController = new
+             * FlowController<Message>(controllable, flow, limiter,
+             * outboundMutex); } else { PrioritySizeLimiter<Message> pl = new
+             * PrioritySizeLimiter<Message>( outputWindowSize,
+             * outputResumeThreshold, priorityLevels);
+             * pl.setPriorityMapper(Message.PRIORITY_MAPPER); outboundController
+             * = new PriorityFlowController<Message>( controllable, flow, pl,
+             * outboundMutex); }
+             */
+        }
+        // outputQueue.setDispatcher(dispatcher);
+
+    }
+
+    private final void write(final Object o) {
+        synchronized (outputQueue) {
+            if (!blockingTransport) {
+                try {
+                    transport.oneway(o);
+                } catch (IOException e) {
+                    onException(e);
+                }
+            } else {
+                blockingWriter.execute(new Runnable() {
                     public void run() {
                         if (!stopping.get()) {
                             try {
-                                transport.oneway(elem);
-                                controller.elementDispatched(elem);
+                                transport.oneway(o);
                             } catch (IOException e) {
                                 onException(e);
                             }
@@ -124,33 +190,21 @@
                     }
                 });
             }
-            public IFlowSink<Message> getFlowSink() {
-                return null;
-            }
-            public IFlowSource<Message> getFlowSource() {
-                return null;
-            }
-        };
-
-        flow = new Flow(name + "-outbound", false);
-        if (priorityLevels <= 1) {
-            limiter = new SizeLimiter<Message>(outputWindowSize, outputResumeThreshold);
-            outboundController = new FlowController<Message>(controllable, flow, limiter,  outboundMutex);
-        } else {
-            PrioritySizeLimiter<Message> pl = new PrioritySizeLimiter<Message>(outputWindowSize, outputResumeThreshold, priorityLevels);
-            pl.setPriorityMapper(Message.PRIORITY_MAPPER);
-            outboundController = new PriorityFlowController<Message>(controllable, flow, pl,  outboundMutex);
         }
+    }
 
+    protected void messageReceived(ISourceController<Message> controller, Message elem) {
+        broker.router.route(controller, elem);
+        inboundController.elementDispatched(elem);
     }
 
     public void onException(IOException error) {
-        onException((Exception)error);
+        onException((Exception) error);
     }
 
     public void onException(Exception error) {
         if (!stopping.get() && !broker.isStopping()) {
-            System.out.println("RemoteConnection error: "+error);
+            System.out.println("RemoteConnection error: " + error);
             error.printStackTrace();
         }
     }
@@ -179,6 +233,13 @@
 
     public void setDispatcher(IDispatcher dispatcher) {
         this.dispatcher = dispatcher;
+        if (transport instanceof DispatchableTransport) {
+            DispatchableTransport dt = ((DispatchableTransport) transport);
+            if (name != null) {
+                dt.setName(name);
+            }
+            dt.setDispatcher(getDispatcher());
+        }
     }
 
     public MockBroker getBroker() {
@@ -202,11 +263,66 @@
     }
 
     public IFlowSink<Message> getSink() {
-        return outboundController;
+        return outputQueue;
     }
 
     public boolean match(Message message) {
         return true;
     }
 
+    private interface ProtocolLimiter<E> extends IFlowLimiter<E> {
+        public void onProtocolMessage(FlowControl m);
+    }
+
+    private class WindowLimiter<E> extends SizeLimiter<E> implements ProtocolLimiter<E> {
+        final Flow flow;
+        final boolean clientMode;
+        private int available;
+
+        public WindowLimiter(boolean clientMode, Flow flow, int capacity, int resumeThreshold) {
+            super(capacity, resumeThreshold);
+            this.clientMode = clientMode;
+            this.flow = flow;
+        }
+
+        public void reserve(E elem) {
+            super.reserve(elem);
+            if (!clientMode) {
+                // System.out.println(RemoteConnection.this.name + " Reserved "
+                // + this);
+            }
+        }
+
+        public void releaseReserved(E elem) {
+            super.reserve(elem);
+            if (!clientMode) {
+                // System.out.println(RemoteConnection.this.name +
+                // " Released Reserved " + this);
+            }
+        }
+
+        protected void remove(int size) {
+            super.remove(size);
+            if (!clientMode) {
+                available += size;
+                if (available >= capacity - resumeThreshold) {
+                    FlowControl fc = new FlowControl();
+                    fc.setCredit(available);
+                    write(fc);
+                    // System.out.println(RemoteConnection.this.name +
+                    // " Send Release " + available + this);
+                    available = 0;
+                }
+            }
+        }
+
+        public void onProtocolMessage(FlowControl m) {
+            remove(m.getCredit());
+        }
+
+        public int getElementSize(Message m) {
+            return m.getFlowLimiterSize();
+        }
+    }
+
 }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java?rev=744939&r1=744938&r2=744939&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java Mon Feb 16 15:32:50 2009
@@ -2,23 +2,20 @@
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.flow.Commands.Destination;
 import org.apache.activemq.metric.MetricAggregator;
 import org.apache.activemq.metric.MetricCounter;
+import org.apache.activemq.transport.DispatchableTransport;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
-import org.apache.activemq.transport.TransportListener;
 
-public class RemoteConsumer implements TransportListener {
+public class RemoteConsumer extends RemoteConnection{
 
-    private final AtomicBoolean stopping = new AtomicBoolean();
     private final MetricCounter consumerRate = new MetricCounter();
 
-    private Transport transport;
-    private MockBroker broker;
-    private String name;
     private MetricAggregator totalConsumerRate;
     private long thinkTime;
     private Destination destination;
@@ -30,6 +27,12 @@
 
         URI uri = broker.getConnectURI();
         transport = TransportFactory.compositeConnect(uri);
+        if(transport instanceof DispatchableTransport)
+        {
+            DispatchableTransport dt = ((DispatchableTransport)transport);
+            dt.setName(name);
+            dt.setDispatcher(getDispatcher());
+        }
         transport.setTransportListener(this);
         transport.start();
         
@@ -37,58 +40,31 @@
         transport.oneway(name);
         // Sending the destination acts as the subscribe.
         transport.oneway(destination);
+        super.initialize();
     }
     
-    public void stop() throws Exception {
-        stopping.set(true);
-        if( transport!=null ) {
-            transport.stop();
-            transport=null;
-        }
-    }
-
-    public void onCommand(Object command) {
-        if( command.getClass() == Message.class ) {
-            
-            if (thinkTime > 0) {
-                try {
-                    Thread.sleep(thinkTime);
-                } catch (InterruptedException e) {
+    protected void messageReceived(final ISourceController<Message> controller, final Message elem) {
+	    if (thinkTime > 0) {
+	        getDispatcher().schedule(new Runnable(){
+
+                public void run() {
+                    consumerRate.increment();
+                    controller.elementDispatched(elem);
                 }
-            }
-            consumerRate.increment();
+	            
+	        }, thinkTime, TimeUnit.MILLISECONDS);
             
-        } else {
-            System.out.println("Unhandled command: "+command);
-        }
-    }
-
-    public void onException(IOException error) {
-        if( !stopping.get() ) {
-            System.out.println("RemoteConsumer error: "+error);
-            error.printStackTrace();
         }
-    }
-
-    public void transportInterupted() {
-    }
-    public void transportResumed() {
+	    else
+	    {
+	        consumerRate.increment();
+	        controller.elementDispatched(elem);
+	    }
     }
 
     public void setName(String name) {
         this.name = name;
     }
-    public void setBroker(MockBroker broker) {
-        this.broker = broker;
-    }
-
-    public MockBroker getBroker() {
-        return broker;
-    }
-
-    public String getName() {
-        return name;
-    }
 
     public MetricAggregator getTotalConsumerRate() {
         return totalConsumerRate;

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java?rev=744939&r1=744938&r2=744939&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java Mon Feb 16 15:32:50 2009
@@ -5,22 +5,21 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
+import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
 import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.flow.ISinkController.FlowUnblockListener;
 import org.apache.activemq.metric.MetricAggregator;
 import org.apache.activemq.metric.MetricCounter;
+import org.apache.activemq.transport.DispatchableTransport;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportListener;
 
-public class RemoteProducer implements TransportListener, Runnable {
+public class RemoteProducer extends RemoteConnection implements Dispatchable, FlowUnblockListener<Message>{
 
-    private final AtomicBoolean stopping = new AtomicBoolean();
     private final MetricCounter rate = new MetricCounter();
 
-    private Transport transport;
-    private MockBroker broker;
-    private String name;
-    private Thread thread;
     private AtomicLong messageIdGenerator;
     private int priority;
     private int priorityMod;
@@ -29,6 +28,8 @@
     private Destination destination;
     private String property;
     private MetricAggregator totalProducerRate;
+    Message next;
+    private DispatchContext dispatchContext;
     
     public void start() throws Exception {
         rate.name("Producer " + name + " Rate");
@@ -37,68 +38,67 @@
         URI uri = broker.getConnectURI();
         transport = TransportFactory.compositeConnect(uri);
         transport.setTransportListener(this);
+        if(transport instanceof DispatchableTransport)
+        {
+            DispatchableTransport dt = ((DispatchableTransport)transport);
+            dt.setName(name);
+            dt.setDispatcher(getDispatcher());
+        }
+        super.setTransport(transport);
+       
+        super.initialize();
         transport.start();
-        
         // Let the remote side know our name.
         transport.oneway(name);
-
-        thread = new Thread(this, name);
-        thread.start();
+        dispatchContext = getDispatcher().register(this, name + "-producer");
+        dispatchContext.requestDispatch();
     }
     
-    public void stop() throws Exception {
-        stopping.set(true);
-        if( transport!=null ) {
-            transport.stop();
-        }
-        thread.join();
-        transport=null;
-    }
-
-    public void run() {
-        try {
-            while( !stopping.get() ) {
-                
-                int priority = this.priority;
-                if (priorityMod > 0) {
-                    priority = counter % priorityMod == 0 ? 0 : priority;
-                }
-
-                Message next = new Message(messageIdGenerator.getAndIncrement(), producerId, name + ++counter, null, destination, priority);
-                if (property != null) {
-                    next.setProperty(property);
-                }
-                
-                transport.oneway(next);
-                rate.increment();
-            }
-        } catch (IOException e) {
-            onException(e);
-        }
-    }
-
-    public void onCommand(Object command) {
-        System.out.println("Unhandled command: "+command);
-    }
-
-    public void onException(IOException error) {
-        if( !stopping.get() ) {
-            System.out.println("RemoteProducer error: "+error);
-            error.printStackTrace();
-        }
-    }
-
-    public void transportInterupted() {
-    }
-    public void transportResumed() {
-    }
-
-    public void setName(String name) {
+    public void stop() throws Exception
+    {
+    	dispatchContext.close();
+    	super.stop();
+    }
+
+	public void onFlowUnblocked(ISinkController<Message> controller) {
+		dispatchContext.requestDispatch();
+	}
+
+	public boolean dispatch() {
+		while(true)
+		{
+			
+			if(next == null)
+			{
+	            int priority = this.priority;
+	            if (priorityMod > 0) {
+	                priority = counter % priorityMod == 0 ? 0 : priority;
+	            }
+	
+	            next = new Message(messageIdGenerator.getAndIncrement(), producerId, name + ++counter, null, destination, priority);
+	            if (property != null) {
+	                next.setProperty(property);
+	            }
+			}
+	        
+			//If flow controlled stop until flow control is lifted.
+			if(outboundController.isSinkBlocked())
+			{
+				if(outboundController.addUnblockListener(this))
+				{
+					return true;
+				}
+			}
+			
+	        getSink().add(next, null);
+	        rate.increment();
+	        next = null;
+		}
+	}
+	
+	public void setName(String name) {
         this.name = name;
     }
-    public void setBroker(MockBroker broker) {
-        this.broker = broker;
-    }
 
     public AtomicLong getMessageIdGenerator() {
         return messageIdGenerator;
@@ -148,14 +148,6 @@
         this.property = property;
     }
 
-    public MockBroker getBroker() {
-        return broker;
-    }
-
-    public String getName() {
-        return name;
-    }
-
     public MetricAggregator getTotalProducerRate() {
         return totalProducerRate;
     }
@@ -166,4 +158,6 @@
 
     public MetricCounter getRate() {
         return rate;
-    }}
+    }
+}
+