You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/02/16 16:32:53 UTC

svn commit: r744939 [1/2] - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/flow/ main/java/org/apache/activemq/queue/ main/java/org/apache/activemq/transport/ main/java/org/apache/activemq/transport/nio/ main/java/org/apache/acti...

Author: chirino
Date: Mon Feb 16 15:32:50 2009
New Revision: 744939

URL: http://svn.apache.org/viewvc?rev=744939&view=rev
Log:
Applying Colins patch at: https://issues.apache.org/activemq/browse/AMQ-2115 basically:

Along with some other refactoring, these changes are an incremental step towards support a non blocking i/o environment conducive to single/low thread count broker.

Listing of changes:
1. Introduced org.apache.activemq.transport.DispatchableTransport and DispatchableTransportServer. These can be provided a dispatcher and operate in a non blocking fashion. They need to be backed by a FlowController to avoid overflow.
2. Changed PipeTransportFactory to operate in non blocking fashion by implementing above interfaces.
3. Added copy of org.apache.activemq.transport.nio package which are being refactored so they can be used with above interfaces.
4. Modified RemoteProducer, RemoteConsumer and BrokerConnection to extend RemoteConnection to take advantage of common network level changes described below.
5. Changed RemoteConnection to incorporate network WindowLimiter which assert flow control via protocol instead of relying on transport level flow control. Also updated RemoteConnection to take a Dispatcher. Deleted AbstractTestConnection.
6. Changed FlowController and PriorityFlowController not to implement IFlowSink. Instead introduced org.apache.activemq.SingleFlowRelay which wraps the FlowController and acts as a Sink/Source that does not queue messages that it is passing through, this now acts as the output sink for RemoteConnection.
7. Modified test.proto ProtocolBuffers definition to include new FlowControl message used to communicate space released by the flow controller in support of using protocol based flowcontrol instead of transport based flow control.
8. Introduced org.apache.activemq.wireformat.StatefulWireformat which allows stateful non blocking marshalling/unmarshalling. Changed ProtoWireFormatFactory.TestWireformat to implement this so that non blocking i/o can be achieved. 


Added:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/DispatchableTransport.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/DispatchableTransportServer.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/NIOInputStream.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/StatefulWireFormat.java
Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSource.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java
    activemq/sandbox/activemq-flow/src/main/proto/test.proto
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/BrokerConnection.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java?rev=744939&r1=744938&r2=744939&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java Mon Feb 16 15:32:50 2009
@@ -21,7 +21,7 @@
 
 public abstract class AbstractLimitedFlowResource<E> implements IFlowResource {
     private final HashSet<FlowLifeCycleListener> lifeCycleWatchers = new HashSet<FlowLifeCycleListener>();
-    private final HashMap<Flow, FlowController<E>> openControllers = new HashMap<Flow, FlowController<E>>();
+    private final HashMap<Flow, IFlowController<E>> openControllers = new HashMap<Flow, IFlowController<E>>();
 
     private final long resourceId = RESOURCE_COUNTER.incrementAndGet();
 
@@ -50,7 +50,7 @@
     public synchronized final void addFlowLifeCycleListener(FlowLifeCycleListener listener) {
         lifeCycleWatchers.add(listener);
         // Notify the watchers of all flows that are already open:
-        for (FlowController<E> controller : openControllers.values()) {
+        for (IFlowController<E> controller : openControllers.values()) {
             listener.onFlowOpened(this, controller.getFlow());
         }
     }
@@ -62,8 +62,8 @@
      * @param controller
      *            The new controller.
      */
-    protected synchronized final void onFlowOpened(FlowController<E> controller) {
-        FlowController<E> existing = openControllers.put(controller.getFlow(), controller);
+    protected synchronized final void onFlowOpened(IFlowController<E> controller) {
+    	IFlowController<E> existing = openControllers.put(controller.getFlow(), controller);
         if (existing != null && existing != controller) {
             // Put the existing controller back:
             openControllers.put(controller.getFlow(), existing);
@@ -76,7 +76,7 @@
     }
 
     protected synchronized final void onFlowClosed(Flow flow) {
-        FlowController<E> existing = openControllers.remove(flow);
+    	IFlowController<E> existing = openControllers.remove(flow);
 
         if (existing != null) {
             for (FlowLifeCycleListener listener : lifeCycleWatchers) {
@@ -96,7 +96,7 @@
      *            The flow
      * @return The FlowController
      */
-    public synchronized FlowController<E> getFlowController(Flow flow) {
+    public synchronized IFlowController<E> getFlowController(Flow flow) {
         return openControllers.get(flow);
     }
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java?rev=744939&r1=744938&r2=744939&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java Mon Feb 16 15:32:50 2009
@@ -25,7 +25,7 @@
 
 /**
  */
-public class FlowController<E> implements ISinkController<E>, ISourceController<E> {
+public class FlowController<E> implements IFlowController<E> {
 
     // Sinks that are blocking us.
     private final HashSet<ISinkController<E>> blockingSinks = new HashSet<ISinkController<E>>();
@@ -438,34 +438,4 @@
     public IFlowSink<E> getFlowSink() {
         return controllable.getFlowSink();
     }
-
-    public long getResourceId() {
-        IFlowSink<E> flowSink = getFlowSink();
-        if( flowSink!=null ) {
-            return flowSink.getResourceId();
-        }
-        return 0;
-    }
-
-    public String getResourceName() {
-        IFlowSink<E> flowSink = getFlowSink();
-        if( flowSink!=null ) {
-            return flowSink.getResourceName();
-        }
-        return null;
-    }
-
-    public void addFlowLifeCycleListener(FlowLifeCycleListener listener) {
-        IFlowSink<E> flowSink = getFlowSink();
-        if( flowSink!=null ) {
-            flowSink.addFlowLifeCycleListener(listener);
-        }
-    }
-    
-    public void removeFlowLifeCycleListener(FlowLifeCycleListener listener) {
-        IFlowSink<E> flowSink = getFlowSink();
-        if( flowSink!=null ) {
-            flowSink.removeFlowLifeCycleListener(listener);
-        }
-    }
 }

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowController.java?rev=744939&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowController.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowController.java Mon Feb 16 15:32:50 2009
@@ -0,0 +1,5 @@
+package org.apache.activemq.flow;
+
+public interface IFlowController<E> extends ISourceController<E>, ISinkController<E> {
+
+}

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSource.java?rev=744939&r1=744938&r2=744939&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSource.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSource.java Mon Feb 16 15:32:50 2009
@@ -28,7 +28,7 @@
      *            The flow.
      * @return The flow controller for the specified flow.
      */
-    public FlowController<E> getFlowController(Flow flow);
+    public IFlowController<E> getFlowController(Flow flow);
 
     /**
      * If set to true the source will automatically release limiter space

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java?rev=744939&r1=744938&r2=744939&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java Mon Feb 16 15:32:50 2009
@@ -16,7 +16,7 @@
  */
 package org.apache.activemq.flow;
 
-public interface ISinkController<E> extends IFlowSink<E> {
+public interface ISinkController<E> {
     /**
      * Defines required attributes for an entity that can be flow controlled.
      * 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java?rev=744939&r1=744938&r2=744939&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java Mon Feb 16 15:32:50 2009
@@ -89,34 +89,4 @@
     public IFlowSink<E> getFlowSink() {
         return null;
     }
-
-    public long getResourceId() {
-        IFlowSink<E> flowSink = getFlowSink();
-        if( flowSink!=null ) {
-            return flowSink.getResourceId();
-        }
-        return 0;
-    }
-
-    public String getResourceName() {
-        IFlowSink<E> flowSink = getFlowSink();
-        if( flowSink!=null ) {
-            return flowSink.getResourceName();
-        }
-        return null;
-    }
-
-    public void addFlowLifeCycleListener(FlowLifeCycleListener listener) {
-        IFlowSink<E> flowSink = getFlowSink();
-        if( flowSink!=null ) {
-            flowSink.addFlowLifeCycleListener(listener);
-        }
-    }
-    
-    public void removeFlowLifeCycleListener(FlowLifeCycleListener listener) {
-        IFlowSink<E> flowSink = getFlowSink();
-        if( flowSink!=null ) {
-            flowSink.removeFlowLifeCycleListener(listener);
-        }
-    }
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java?rev=744939&r1=744938&r2=744939&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java Mon Feb 16 15:32:50 2009
@@ -18,7 +18,7 @@
 
 import java.util.ArrayList;
 
-public class PriorityFlowController<E> implements ISourceController<E>, ISinkController<E> {
+public class PriorityFlowController<E> implements IFlowController<E> {
 
     private final Object mutex;
     private final ArrayList<FlowController<E>> controllers;
@@ -70,37 +70,6 @@
         throw new UnsupportedOperationException();
     }
     
-    public long getResourceId() {
-        IFlowSink<E> flowSink = getFlowSink();
-        if( flowSink!=null ) {
-            return flowSink.getResourceId();
-        }
-        return 0;
-    }
-
-    public String getResourceName() {
-        IFlowSink<E> flowSink = getFlowSink();
-        if( flowSink!=null ) {
-            return flowSink.getResourceName();
-        }
-        return null;
-    }
-
-    public void addFlowLifeCycleListener(FlowLifeCycleListener listener) {
-        IFlowSink<E> flowSink = getFlowSink();
-        if( flowSink!=null ) {
-            flowSink.addFlowLifeCycleListener(listener);
-        }
-    }
-    
-    public void removeFlowLifeCycleListener(FlowLifeCycleListener listener) {
-        IFlowSink<E> flowSink = getFlowSink();
-        if( flowSink!=null ) {
-            flowSink.removeFlowLifeCycleListener(listener);
-        }
-    }
-
-
     // /////////////////////////////////////////////////////////////////
     // ISourceController interface impl.
     // /////////////////////////////////////////////////////////////////

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java?rev=744939&r1=744938&r2=744939&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java Mon Feb 16 15:32:50 2009
@@ -31,7 +31,7 @@
         this.resumeThreshold = resumeThreshold;
     }
 
-    public final boolean add(E elem) {
+    public boolean add(E elem) {
         this.size += getElementSize(elem);
 
         if (this.size >= capacity) {
@@ -102,4 +102,8 @@
     public int getSize() {
         return size;
     }
+
+    public String toString() {
+        return "SizeLimiter " + capacity + "/" + resumeThreshold + ", s=" + size + " res=" + reserved + ", thr= " + throttled;
+    }
 }

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java?rev=744939&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java Mon Feb 16 15:32:50 2009
@@ -0,0 +1,48 @@
+package org.apache.activemq.queue;
+
+import org.apache.activemq.flow.AbstractLimitedFlowSource;
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.FlowController;
+import org.apache.activemq.flow.IFlowController;
+import org.apache.activemq.flow.IFlowLimiter;
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.IFlowSource;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.ISinkController.FlowControllable;
+
+public class SingleFlowRelay<E> extends AbstractLimitedFlowSource<E> implements
+		IFlowSink<E>, IFlowSource<E>, FlowControllable<E> {
+
+	private final IFlowController<E> controller;
+
+	public SingleFlowRelay(Flow flow, String name, IFlowLimiter<E> limiter) {
+		super(name);
+		FlowController<E> c = new FlowController<E>(this, flow, limiter, this);
+		c.useOverFlowQueue(false);
+		controller = c;
+		super.onFlowOpened(controller);
+	}
+
+	public void add(E elem, ISourceController<E> source) {
+		controller.add(elem, source);
+
+	}
+
+	public boolean offer(E elem, ISourceController<E> source) {
+		return controller.offer(elem, source);
+	}
+
+	public void flowElemAccepted(ISourceController<E> controller, E elem) {
+		drain.drain(elem, controller);
+	}
+
+	public IFlowSink<E> getFlowSink() {
+		// TODO Auto-generated method stub
+		return this;
+	}
+
+	public IFlowSource<E> getFlowSource() {
+		// TODO Auto-generated method stub
+		return this;
+	}
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/DispatchableTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/DispatchableTransport.java?rev=744939&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/DispatchableTransport.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/DispatchableTransport.java Mon Feb 16 15:32:50 2009
@@ -0,0 +1,9 @@
+package org.apache.activemq.transport;
+
+import org.apache.activemq.dispatch.IDispatcher;
+
+public interface DispatchableTransport extends Transport{
+
+	public void setDispatcher(IDispatcher dispatcher);
+	public void setName(String name);
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/DispatchableTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/DispatchableTransportServer.java?rev=744939&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/DispatchableTransportServer.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/DispatchableTransportServer.java Mon Feb 16 15:32:50 2009
@@ -0,0 +1,8 @@
+package org.apache.activemq.transport;
+
+import org.apache.activemq.dispatch.IDispatcher;
+
+public interface DispatchableTransportServer extends TransportServer {
+
+	public void setDispatcher(IDispatcher dispatcher);
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/NIOInputStream.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/NIOInputStream.java?rev=744939&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/NIOInputStream.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/NIOInputStream.java Mon Feb 16 15:32:50 2009
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * An optimized buffered input stream for Tcp
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class NIOInputStream extends InputStream {
+
+    protected int count;
+    protected int position;
+    private final ByteBuffer in;
+
+    public NIOInputStream(ByteBuffer in) {
+        this.in = in;
+    }
+
+    public int read() throws IOException {
+        try {
+            int rc = in.get() & 0xff;
+            return rc;
+        } catch (BufferUnderflowException e) {
+            return -1;
+        }
+    }
+
+    public int read(byte b[], int off, int len) throws IOException {
+        if (in.hasRemaining()) {
+            int rc = Math.min(len, in.remaining());
+            in.get(b, off, rc);
+            return rc;
+        } else {
+            return len == 0 ? 0 : -1;
+        }
+    }
+
+    public long skip(long n) throws IOException {
+        int rc = Math.min((int)n, in.remaining());
+        in.position(in.position() + rc);
+        return rc;
+    }
+
+    public int available() throws IOException {
+        return in.remaining();
+    }
+
+    public boolean markSupported() {
+        return false;
+    }
+
+    public void close() throws IOException {
+    }
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java?rev=744939&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java Mon Feb 16 15:32:50 2009
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * An optimized buffered outputstream for Tcp
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+
+public class NIOOutputStream extends OutputStream {
+
+    private static final int BUFFER_SIZE = 8192;
+
+    private final WritableByteChannel out;
+    private final byte[] buffer;
+    private final ByteBuffer byteBuffer;
+
+    private int count;
+    private boolean closed;
+
+    /**
+     * Constructor
+     * 
+     * @param out
+     */
+    public NIOOutputStream(WritableByteChannel out) {
+        this(out, BUFFER_SIZE);
+    }
+
+    /**
+     * Creates a new buffered output stream to write data to the specified
+     * underlying output stream with the specified buffer size.
+     * 
+     * @param out the underlying output stream.
+     * @param size the buffer size.
+     * @throws IllegalArgumentException if size <= 0.
+     */
+    public NIOOutputStream(WritableByteChannel out, int size) {
+        this.out = out;
+        if (size <= 0) {
+            throw new IllegalArgumentException("Buffer size <= 0");
+        }
+        buffer = new byte[size];
+        byteBuffer = ByteBuffer.wrap(buffer);
+    }
+
+    /**
+     * write a byte on to the stream
+     * 
+     * @param b - byte to write
+     * @throws IOException
+     */
+    public void write(int b) throws IOException {
+        checkClosed();
+        if (availableBufferToWrite() < 1) {
+            flush();
+        }
+        buffer[count++] = (byte)b;
+    }
+
+    /**
+     * write a byte array to the stream
+     * 
+     * @param b the byte buffer
+     * @param off the offset into the buffer
+     * @param len the length of data to write
+     * @throws IOException
+     */
+    public void write(byte b[], int off, int len) throws IOException {
+        checkClosed();
+        if (availableBufferToWrite() < len) {
+            flush();
+        }
+        if (buffer.length >= len) {
+            System.arraycopy(b, off, buffer, count, len);
+            count += len;
+        } else {
+            write(ByteBuffer.wrap(b, off, len));
+        }
+    }
+
+    /**
+     * flush the data to the output stream This doesn't call flush on the
+     * underlying outputstream, because Tcp is particularly efficent at doing
+     * this itself ....
+     * 
+     * @throws IOException
+     */
+    public void flush() throws IOException {
+        if (count > 0 && out != null) {
+            byteBuffer.position(0);
+            byteBuffer.limit(count);
+            write(byteBuffer);
+            count = 0;
+        }
+    }
+
+    /**
+     * close this stream
+     * 
+     * @throws IOException
+     */
+    public void close() throws IOException {
+        super.close();
+        closed = true;
+    }
+
+    /**
+     * Checks that the stream has not been closed
+     * 
+     * @throws IOException
+     */
+    protected void checkClosed() throws IOException {
+        if (closed) {
+            throw new EOFException("Cannot write to the stream any more it has already been closed");
+        }
+    }
+
+    /**
+     * @return the amount free space in the buffer
+     */
+    private int availableBufferToWrite() {
+        return buffer.length - count;
+    }
+
+    protected void write(ByteBuffer data) throws IOException {
+        int remaining = data.remaining();
+        int lastRemaining = remaining - 1;
+        long delay = 1;
+        while (remaining > 0) {
+
+            // We may need to do a little bit of sleeping to avoid a busy loop.
+            // Slow down if no data was written out..
+            if (remaining == lastRemaining) {
+                try {
+                    // Use exponential rollback to increase sleep time.
+                    Thread.sleep(delay);
+                    delay *= 2;
+                    if (delay > 1000) {
+                        delay = 1000;
+                    }
+                } catch (InterruptedException e) {
+                    throw new InterruptedIOException();
+                }
+            } else {
+                delay = 1;
+            }
+            lastRemaining = remaining;
+
+            // Since the write is non-blocking, all the data may not have been
+            // written.
+            out.write(data);
+            remaining = data.remaining();
+        }
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java?rev=744939&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java Mon Feb 16 15:32:50 2009
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import javax.net.SocketFactory;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * An implementation of the {@link Transport} interface using raw tcp/ip
+ * 
+ * @version $Revision$
+ */
+public class NIOTransport extends TcpTransport {
+
+    // private static final Log log = LogFactory.getLog(NIOTransport.class);
+    private SocketChannel channel;
+    private SelectorSelection selection;
+    private ByteBuffer inputBuffer;
+    private ByteBuffer currentBuffer;
+    private int nextFrameSize;
+
+    public NIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
+        super(wireFormat, socketFactory, remoteLocation, localLocation);
+    }
+
+    public NIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
+        super(wireFormat, socket);
+    }
+
+    protected void initializeStreams() throws IOException {
+        channel = socket.getChannel();
+        channel.configureBlocking(false);
+
+        // listen for events telling us when the socket is readable.
+        selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
+            public void onSelect(SelectorSelection selection) {
+                serviceRead();
+            }
+
+            public void onError(SelectorSelection selection, Throwable error) {
+                if (error instanceof IOException) {
+                    onException((IOException)error);
+                } else {
+                    onException(IOExceptionSupport.create(error));
+                }
+            }
+        });
+
+        // Send the data via the channel
+        // inputBuffer = ByteBuffer.allocateDirect(8*1024);
+        inputBuffer = ByteBuffer.allocate(8 * 1024);
+        currentBuffer = inputBuffer;
+        nextFrameSize = -1;
+        currentBuffer.limit(4);
+        this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 16 * 1024));
+
+    }
+
+    private void serviceRead() {
+        try {
+            while (true) {
+
+                int readSize = channel.read(currentBuffer);
+                if (readSize == -1) {
+                    onException(new EOFException());
+                    selection.close();
+                    break;
+                }
+                if (readSize == 0) {
+                    break;
+                }
+
+                if (currentBuffer.hasRemaining()) {
+                    continue;
+                }
+
+                // Are we trying to figure out the size of the next frame?
+                if (nextFrameSize == -1) {
+                    assert inputBuffer == currentBuffer;
+
+                    // If the frame is too big to fit in our direct byte buffer,
+                    // Then allocate a non direct byte buffer of the right size
+                    // for it.
+                    inputBuffer.flip();
+                    nextFrameSize = inputBuffer.getInt() + 4;
+                    if (nextFrameSize > inputBuffer.capacity()) {
+                        currentBuffer = ByteBuffer.allocate(nextFrameSize);
+                        currentBuffer.putInt(nextFrameSize);
+                    } else {
+                        inputBuffer.limit(nextFrameSize);
+                    }
+
+                } else {
+                    currentBuffer.flip();
+
+                    Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer)));
+                    doConsume((Command)command);
+
+                    nextFrameSize = -1;
+                    inputBuffer.clear();
+                    inputBuffer.limit(4);
+                    currentBuffer = inputBuffer;
+                }
+
+            }
+
+        } catch (IOException e) {
+            onException(e);
+        } catch (Throwable e) {
+            onException(IOExceptionSupport.create(e));
+        }
+    }
+
+    protected void doStart() throws Exception {
+        connect();
+        selection.setInterestOps(SelectionKey.OP_READ);
+        selection.enable();
+    }
+
+    protected void doStop(ServiceStopper stopper) throws Exception {
+        selection.disable();
+        super.doStop(stopper);
+    }
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java?rev=744939&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java Mon Feb 16 15:32:50 2009
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
+import org.apache.activemq.wireformat.WireFormat;
+
+public class NIOTransportFactory extends TcpTransportFactory {
+
+    protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+        return new TcpTransportServer(this, location, serverSocketFactory) {
+            protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
+                return new NIOTransport(format, socket);
+            }
+        };
+    }
+
+    protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
+        return new NIOTransport(wf, socketFactory, location, localLocation);
+    }
+
+    protected ServerSocketFactory createServerSocketFactory() {
+        return new ServerSocketFactory() {
+            public ServerSocket createServerSocket(int port) throws IOException {
+                ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+                serverSocketChannel.socket().bind(new InetSocketAddress(port));
+                return serverSocketChannel.socket();
+            }
+
+            public ServerSocket createServerSocket(int port, int backlog) throws IOException {
+                ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+                serverSocketChannel.socket().bind(new InetSocketAddress(port), backlog);
+                return serverSocketChannel.socket();
+            }
+
+            public ServerSocket createServerSocket(int port, int backlog, InetAddress ifAddress) throws IOException {
+                ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+                serverSocketChannel.socket().bind(new InetSocketAddress(ifAddress, port), backlog);
+                return serverSocketChannel.socket();
+            }
+        };
+    }
+
+    protected SocketFactory createSocketFactory() {
+        return new SocketFactory() {
+
+            public Socket createSocket() throws IOException {
+                SocketChannel channel = SocketChannel.open();
+                return channel.socket();
+            }
+
+            public Socket createSocket(String host, int port) throws IOException, UnknownHostException {
+                SocketChannel channel = SocketChannel.open();
+                channel.connect(new InetSocketAddress(host, port));
+                return channel.socket();
+            }
+
+            public Socket createSocket(InetAddress address, int port) throws IOException {
+                SocketChannel channel = SocketChannel.open();
+                channel.connect(new InetSocketAddress(address, port));
+                return channel.socket();
+            }
+
+            public Socket createSocket(String address, int port, InetAddress localAddresss, int localPort) throws IOException, UnknownHostException {
+                SocketChannel channel = SocketChannel.open();
+                channel.socket().bind(new InetSocketAddress(localAddresss, localPort));
+                channel.connect(new InetSocketAddress(address, port));
+                return channel.socket();
+            }
+
+            public Socket createSocket(InetAddress address, int port, InetAddress localAddresss, int localPort) throws IOException {
+                SocketChannel channel = SocketChannel.open();
+                channel.socket().bind(new InetSocketAddress(localAddresss, localPort));
+                channel.connect(new InetSocketAddress(address, port));
+                return channel.socket();
+            }
+        };
+    }
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java?rev=744939&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java Mon Feb 16 15:32:50 2009
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.io.IOException;
+import java.nio.channels.SelectableChannel;
+import java.util.LinkedList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * The SelectorManager will manage one Selector and the thread that checks the
+ * selector.
+ * 
+ * We may need to consider running more than one thread to check the selector if
+ * servicing the selector takes too long.
+ * 
+ * @version $Rev: 46019 $ $Date: 2004-09-14 05:56:06 -0400 (Tue, 14 Sep 2004) $
+ */
+public final class SelectorManager {
+
+    public static final SelectorManager SINGLETON = new SelectorManager();
+
+    private Executor selectorExecutor = Executors.newCachedThreadPool(new ThreadFactory() {
+        public Thread newThread(Runnable r) {
+            Thread rc = new Thread(r);
+            rc.setName("NIO Transport Thread");
+            return rc;
+        }
+    });
+    private Executor channelExecutor = selectorExecutor;
+    private LinkedList<SelectorWorker> freeWorkers = new LinkedList<SelectorWorker>();
+    private int maxChannelsPerWorker = 64;
+    
+    static SelectorManager getInstance() {
+        return SINGLETON;
+    }
+
+    public interface Listener {
+        void onSelect(SelectorSelection selector);
+
+        void onError(SelectorSelection selection, Throwable error);
+    }
+
+
+    public synchronized SelectorSelection register(SelectableChannel socketChannel, Listener listener)
+        throws IOException {
+
+        SelectorWorker worker = null;
+        if (freeWorkers.size() > 0) {
+            worker = freeWorkers.getFirst();
+        } else {
+            worker = new SelectorWorker(this);
+            freeWorkers.addFirst(worker);
+        }
+
+        SelectorSelection selection = new SelectorSelection(worker, socketChannel, listener);
+        return selection;
+    }
+
+    synchronized void onWorkerFullEvent(SelectorWorker worker) {
+        freeWorkers.remove(worker);
+    }
+
+    public synchronized void onWorkerEmptyEvent(SelectorWorker worker) {
+        freeWorkers.remove(worker);
+    }
+
+    public synchronized void onWorkerNotFullEvent(SelectorWorker worker) {
+        freeWorkers.add(worker);
+    }
+
+    public Executor getChannelExecutor() {
+        return channelExecutor;
+    }
+
+    public void setChannelExecutor(Executor channelExecutor) {
+        this.channelExecutor = channelExecutor;
+    }
+
+    public int getMaxChannelsPerWorker() {
+        return maxChannelsPerWorker;
+    }
+
+    public void setMaxChannelsPerWorker(int maxChannelsPerWorker) {
+        this.maxChannelsPerWorker = maxChannelsPerWorker;
+    }
+
+    public Executor getSelectorExecutor() {
+        return selectorExecutor;
+    }
+
+    public void setSelectorExecutor(Executor selectorExecutor) {
+        this.selectorExecutor = selectorExecutor;
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java?rev=744939&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java Mon Feb 16 15:32:50 2009
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+
+import org.apache.activemq.transport.nio.SelectorManager.Listener;
+
+/**
+ * @author chirino
+ */
+public final class SelectorSelection {
+
+    private final SelectorWorker worker;
+    private final SelectionKey key;
+    private final Listener listener;
+    private boolean useChannelExecutor;
+    private int interest;
+
+    public SelectorSelection(SelectorWorker worker, SelectableChannel selectableChannel, Listener listener) throws ClosedChannelException {
+        this.worker = worker;
+        this.listener = listener;
+        this.key = selectableChannel.register(worker.selector, 0, this);
+        worker.incrementUseCounter();
+    }
+
+    public void setInterestOps(int ops) {
+        interest = ops;
+    }
+
+    public void enable() {
+        key.interestOps(interest);
+        worker.selector.wakeup();
+    }
+
+    public void disable() {
+        key.interestOps(0);
+    }
+
+    public void setUseChannelExecutor(boolean useChannelExecutor) {
+        this.useChannelExecutor = useChannelExecutor;
+    }
+
+    public boolean getUseChannelExecutor() {
+        return useChannelExecutor;
+    }
+
+    public void close() {
+        worker.decrementUseCounter();
+        key.cancel();
+        worker.selector.wakeup();
+    }
+
+    public void onSelect() {
+        listener.onSelect(this);
+    }
+
+    public void onError(Throwable e) {
+        listener.onError(this, e);
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java?rev=744939&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java Mon Feb 16 15:32:50 2009
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class SelectorWorker implements Runnable {
+
+    private static final AtomicInteger NEXT_ID = new AtomicInteger();
+
+    final SelectorManager manager;
+    final Selector selector;
+    final int id = NEXT_ID.getAndIncrement();
+    final AtomicInteger useCounter = new AtomicInteger();
+    private final int maxChannelsPerWorker;
+
+    public SelectorWorker(SelectorManager manager) throws IOException {
+        this.manager = manager;
+        selector = Selector.open();
+        maxChannelsPerWorker = manager.getMaxChannelsPerWorker();
+    }
+
+    void incrementUseCounter() {
+        int use = useCounter.getAndIncrement();
+        if (use == 0) {
+            manager.getSelectorExecutor().execute(this);
+        } else if (use + 1 == maxChannelsPerWorker) {
+            manager.onWorkerFullEvent(this);
+        }
+    }
+
+    void decrementUseCounter() {
+        int use = useCounter.getAndDecrement();
+        if (use == 1) {
+            manager.onWorkerEmptyEvent(this);
+        } else if (use == maxChannelsPerWorker) {
+            manager.onWorkerNotFullEvent(this);
+        }
+    }
+
+    boolean isRunning() {
+        return useCounter.get() != 0;
+    }
+
+    public void run() {
+
+        String origName = Thread.currentThread().getName();
+        try {
+            Thread.currentThread().setName("Selector Worker: " + id);
+            while (isRunning()) {
+
+                int count = selector.select(10);
+                if (count == 0) {
+                    continue;
+                }
+
+                if (!isRunning()) {
+                    return;
+                }
+
+                // Get a java.util.Set containing the SelectionKey objects
+                // for all channels that are ready for I/O.
+                Set keys = selector.selectedKeys();
+
+                for (Iterator i = keys.iterator(); i.hasNext();) {
+                    final SelectionKey key = (SelectionKey)i.next();
+                    i.remove();
+
+                    final SelectorSelection s = (SelectorSelection)key.attachment();
+                    try {
+                        s.disable();
+                        if(s.getUseChannelExecutor())
+                        {
+                            // Kick off another thread to find newly selected keys
+                            // while we process the
+                            // currently selected keys
+                            manager.getChannelExecutor().execute(new Runnable() {
+                                public void run() {
+                                    try {
+                                        s.onSelect();
+                                        s.enable();
+                                    } catch (Throwable e) {
+                                        s.onError(e);
+                                    }
+                                }
+                            });
+                        }
+                        else
+                        {
+                            s.onSelect();
+                        }
+
+                    } catch (Throwable e) {
+                        s.onError(e);
+                    }
+
+                }
+
+            }
+        } catch (IOException e) {
+
+            // Don't accept any more slections
+            manager.onWorkerEmptyEvent(this);
+
+            // Notify all the selections that the error occurred.
+            Set keys = selector.keys();
+            for (Iterator i = keys.iterator(); i.hasNext();) {
+                SelectionKey key = (SelectionKey)i.next();
+                SelectorSelection s = (SelectorSelection)key.attachment();
+                s.onError(e);
+            }
+
+        } finally {
+            Thread.currentThread().setName(origName);
+        }
+    }
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/StatefulWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/StatefulWireFormat.java?rev=744939&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/StatefulWireFormat.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/StatefulWireFormat.java Mon Feb 16 15:32:50 2009
@@ -0,0 +1,27 @@
+package org.apache.activemq.wireformat;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.activemq.wireformat.WireFormat;
+
+public interface StatefulWireFormat extends WireFormat{
+
+    /**
+     * Writes a command to the target buffer, returning false if
+     * the command couldn't entirely fit into the target. 
+     * @param command
+     * @param target
+     * @return
+     */
+    public boolean marshal(Object command, ByteBuffer target) throws IOException;
+    
+    /**
+     * Unmarshals an object. When the object is read it is returned.
+     * @param source
+     * @return The object when unmarshalled, null otherwise
+     */
+    public Object unMarshal(ByteBuffer source) throws IOException;
+    
+    
+}

Modified: activemq/sandbox/activemq-flow/src/main/proto/test.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/proto/test.proto?rev=744939&r1=744938&r2=744939&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/proto/test.proto (original)
+++ activemq/sandbox/activemq-flow/src/main/proto/test.proto Mon Feb 16 15:32:50 2009
@@ -35,3 +35,6 @@
   repeated string property=7;
 }
 
+message FlowControl {
+  required int32 credit = 2;
+}

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java?rev=744939&r1=744938&r2=744939&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java Mon Feb 16 15:32:50 2009
@@ -1,483 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.flow;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.activemq.Service;
-import org.apache.activemq.dispatch.IDispatcher;
-import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
-import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
-import org.apache.activemq.flow.IFlowResource.FlowLifeCycleListener;
-import org.apache.activemq.flow.ISinkController.FlowControllable;
-import org.apache.activemq.queue.ExclusivePriorityQueue;
-import org.apache.activemq.queue.IAsynchronousFlowSource;
-import org.apache.activemq.queue.IBlockingFlowSource;
-import org.apache.activemq.queue.IFlowQueue;
-import org.apache.activemq.queue.IPollableFlowSource;
-import org.apache.activemq.queue.SingleFlowPriorityQueue;
-
-public abstract class AbstractTestConnection implements Service {
-    
-    protected final IFlowQueue<Message> output;
-    protected final NetworkSource input;
-    protected final MockBroker broker;
-    protected final String name;
-    protected final Flow flow;
-
-    private AtomicBoolean running = new AtomicBoolean();
-    private Thread listener;
-    private Thread sender;
-
-    private final int outputQueueSize = 1000;
-    private final int resumeThreshold = 500;
-
-    private final int inputWindowSize = 1000;
-    private final int inputResumeThreshold = 900;
-
-    public static final int BLOCKING = 0;
-    public static final int POLLING = 1;
-    public static final int ASYNC = 2;
-
-    AbstractTestConnection(MockBroker broker, String name, Flow flow, Pipe<Message> p) {
-        this.name = name;
-        this.broker = broker;
-        this.flow = flow;
-
-        // Set up an input source:
-        this.input = new NetworkSource(flow, name + "-INPUT", inputWindowSize, inputResumeThreshold);
-
-        // Setup output queue:
-        if (MockBrokerTest.PRIORITY_LEVELS <= 1) {
-            this.output = TestFlowManager.createFlowQueue(flow, name + "-OUTPUT", outputQueueSize, resumeThreshold);
-        } else {
-            PrioritySizeLimiter<Message> pl = new PrioritySizeLimiter<Message>(outputQueueSize, resumeThreshold, MockBrokerTest.PRIORITY_LEVELS);
-            pl.setPriorityMapper(Message.PRIORITY_MAPPER);
-            ExclusivePriorityQueue<Message> t = new ExclusivePriorityQueue<Message>(flow, name + "-OUTPUT", pl);
-            this.output = t;
-        }
-
-        output.setDrain(new IFlowDrain<Message>() {
-            public void drain(Message m, ISourceController<Message> controller) {
-                try {
-                    write(m, controller);
-                } catch (InterruptedException e) {
-                    // TODO Auto-generated catch block
-                    Thread.currentThread().interrupt();
-                }
-            }
-        });
-
-        // We must watch the output for open and close of flows and communicate
-        // it
-        // to the peer.
-        output.addFlowLifeCycleListener(new FlowLifeCycleListener() {
-            public void onFlowClosed(IFlowResource resource, Flow flow) {
-                try {
-                    write(new FlowClose(flow), null);
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                }
-            }
-
-            public void onFlowOpened(IFlowResource resource, Flow flow) {
-                try {
-                    // Set the limiter to a WindowedLimiter capable of handling
-                    // flow control messages from the peer:
-                    output.getFlowController(flow).setLimiter(new WindowLimiter<Message>(true, flow, outputQueueSize, resumeThreshold));
-                    // Tell the other side that we've open a flow.
-                    write(new FlowOpen(flow), null);
-
-                    FlowController<Message> controller = output.getFlowController(flow);
-                    if (controller != null) {
-                        controller.setLimiter(new WindowLimiter<Message>(true, flow, outputQueueSize, resumeThreshold));
-                        // Tell the other side that we've open a flow.
-                        write(new FlowOpen(flow), null);
-                    }
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                }
-            }
-
-        });
-
-        output.setDispatchPriority(0);
-
-    }
-
-    public final void simulateEncodingWork() {
-        if (MockBrokerTest.IO_WORK_AMOUNT > 1) {
-            fib(MockBrokerTest.IO_WORK_AMOUNT);
-        }
-    }
-
-    public final void fib(int n) {
-        if (n > 1) {
-            fib(n - 1);
-            fib(n - 2);
-        }
-    }
-
-    private interface ProtocolLimiter {
-        public void onProtocolMessage(Message m);
-    }
-
-    private class WindowLimiter<E> extends SizeLimiter<E> implements ProtocolLimiter {
-        final Flow flow;
-        final boolean clientMode;
-        private int available;
-
-        public WindowLimiter(boolean clientMode, Flow flow, int capacity, int resumeThreshold) {
-            super(capacity, resumeThreshold);
-            this.clientMode = clientMode;
-            this.flow = flow;
-        }
-
-        protected void remove(int size) {
-            super.remove(size);
-            if (!clientMode) {
-                available += size;
-                if (available > capacity - resumeThreshold) {
-                    try {
-                        write(new FlowMessage(flow, available, 0), null);
-                    } catch (InterruptedException e) {
-                        // TODO Auto-generated catch block
-                        e.printStackTrace();
-                    }
-                    available = 0;
-                }
-            }
-        }
-
-        public void onProtocolMessage(Message m) {
-            if (m.type() == Message.TYPE_FLOW_CONTROL) {
-                FlowMessage fm = (FlowMessage) m;
-                super.remove(fm.size);
-            }
-        }
-
-        public int getElementSize(Message m) {
-            return m.getFlowLimiterSize();
-        }
-    }
-
-    public final void start() throws Exception {
-        running.set(true);
-        if (MockBrokerTest.DISPATCH_MODE == BLOCKING) {
-            listener = new Thread(new Runnable() {
-                public void run() {
-                    try {
-                        while (true) {
-                            input.blockingDispatch();
-                        }
-                    } catch (InterruptedException e) {
-                        return;
-                    }
-                }
-            }, name + "-Listener");
-            listener.start();
-            sender = new Thread(new Runnable() {
-                public void run() {
-                    try {
-                        while (true) {
-                            output.blockingDispatch();
-                        }
-                    } catch (InterruptedException e) {
-                    }
-                }
-            }, name + "-Sender");
-            sender.start();
-        } else {
-            output.setDispatcher(broker.getDispatcher());
-            input.setDispatcher(broker.getDispatcher());
-            return;
-        }
-
-    }
-
-    public final void stop() throws Exception {
-        running.set(false);
-        if (MockBrokerTest.DISPATCH_MODE == BLOCKING) {
-            listener.interrupt();
-            listener.join();
-            sender.interrupt();
-            sender.join();
-        }
-    }
-
-    protected abstract Message getNextMessage() throws InterruptedException;
-
-    protected abstract Message pollNextMessage();
-
-    protected abstract void addReadReadyListener(ReadReadyListener listener);
-
-    /**
-     * Must be implemented to write a message.
-     * 
-     * @param m
-     */
-    protected abstract void write(Message m, ISourceController<Message> controller) throws InterruptedException;
-
-    /**
-     * Handles a received message.
-     */
-    protected abstract void messageReceived(Message m, ISourceController<Message> controller);
-
-    /**
-     * Simulates a network source of messages.
-     * 
-     * @param <E>
-     */
-    protected class NetworkSource extends AbstractLimitedFlowSource<Message> implements IBlockingFlowSource<Message>, IAsynchronousFlowSource<Message>, IPollableFlowSource<Message>,
-            FlowControllable<Message> {
-        private final FlowController<Message> flowController;
-        private final Flow flow;
-        private final IFlowQueue<Message> inputQueue;
-        private DispatchContext dispatchContext;
-
-        public NetworkSource(Flow flow, String name, int capacity, int resumeThreshold) {
-            super(name);
-            if (flow == null) {
-                if (MockBrokerTest.USE_INPUT_QUEUES) {
-                    inputQueue = TestFlowManager.createFlowQueue(flow, name, capacity, resumeThreshold);
-                } else {
-                    inputQueue = null;
-                }
-                flowController = null;
-            } else {
-                if (MockBrokerTest.USE_INPUT_QUEUES) {
-                    if (MockBrokerTest.PRIORITY_LEVELS <= 1) {
-                        inputQueue = TestFlowManager.createFlowQueue(flow, name, capacity, resumeThreshold);
-                    } else {
-                        SingleFlowPriorityQueue<Message> t = new SingleFlowPriorityQueue<Message>(flow, name, new SizeLimiter<Message>(capacity, resumeThreshold));
-                        t.setPriorityMapper(Message.PRIORITY_MAPPER);
-                        inputQueue = t;
-                    }
-                    flowController = inputQueue.getFlowController(flow);
-                    // Allow overflow we should be limited by protocol:
-                    flowController.useOverFlowQueue(false);
-                    flowController.setLimiter(new SizeLimiter<Message>(capacity, resumeThreshold));
-                    super.onFlowOpened(flowController);
-                } else {
-                    inputQueue = null;
-                    SizeLimiter<Message> limiter = new SizeLimiter<Message>(capacity, resumeThreshold);
-                    flowController = new FlowController<Message>(this, flow, limiter, this);
-                    // Allow overflow we should be limited by protocol:
-                    flowController.useOverFlowQueue(false);
-                    super.onFlowOpened(flowController);
-                }
-            }
-            this.flow = flow;
-        }
-
-        public synchronized void setDispatcher(final IDispatcher dispatcher) {
-
-            if (inputQueue != null) {
-                inputQueue.setDrain(new IFlowDrain<Message>() {
-
-                    public void drain(Message elem, ISourceController<Message> controller) {
-                        messageReceived(elem, controller);
-                        controller.elementDispatched(elem);
-                    }
-
-                });
-                inputQueue.setDispatcher(dispatcher);
-                inputQueue.setDispatchPriority(0);
-            }
-
-            dispatchContext = dispatcher.register(new Dispatchable() {
-                public boolean dispatch() {
-                    if (!pollingDispatch()) {
-                        addReadReadyListener(new ReadReadyListener() {
-                            public void onReadReady() {
-                                if (running.get()) {
-                                    dispatchContext.requestDispatch();
-                                }
-                            }
-                        });
-                        return true;
-                    }
-                    return false;
-                }
-            }, name + "-IOInbound");
-
-            // For reading messages assume maximum priority: These are placed
-            // into
-            // the input queue, where message priority will dictate broker
-            // dispatch
-            // priority. Note that flow control from the input queue will limit
-            // dispatch
-            // of lower priority messages:
-            if (MockBrokerTest.USE_INPUT_QUEUES) {
-                dispatchContext.updatePriority(Message.MAX_PRIORITY);
-            }
-            dispatchContext.requestDispatch();
-
-        }
-
-        public FlowController<Message> getFlowController(Flow flow) {
-            if (this.flow != null) {
-                return flowController;
-            } else {
-                return super.getFlowController(flow);
-            }
-        }
-
-        public void blockingDispatch() throws InterruptedException {
-            Message m = getNextMessage();
-            dispatch(m, getFlowController(m.getFlow()));
-        }
-
-        public void dispatch(Message m, FlowController<Message> controller) {
-
-            switch (m.type()) {
-            case Message.TYPE_FLOW_CONTROL: {
-                FlowMessage fm = (FlowMessage) m;
-                synchronized (output) {
-                    try {
-                        FlowController<Message> outputController = output.getFlowController(fm.getFlow());
-                        ProtocolLimiter pl = (ProtocolLimiter) outputController.getLimiter();
-                        synchronized (output) {
-                            pl.onProtocolMessage(fm);
-                        }
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
-                }
-                break;
-            }
-            case Message.TYPE_FLOW_OPEN: {
-                FlowController<Message> inputController = new FlowController<Message>(this, m.getFlow(), new WindowLimiter<Message>(false, m.getFlow(), inputWindowSize, inputResumeThreshold), this);
-                // Allow overflow we should be limited by protocol:
-                inputController.useOverFlowQueue(false);
-                super.onFlowOpened(inputController);
-                break;
-            }
-            case Message.TYPE_FLOW_CLOSE: {
-                super.onFlowClosed(m.getFlow());
-                break;
-            }
-            default: {
-                if (inputQueue != null) {
-                    inputQueue.add(m, null);
-                } else {
-                    controller.add(m, null);
-                }
-            }
-            }
-        }
-
-        public void addFlowReadyListener(final IPollableFlowSource.FlowReadyListener<Message> listener) {
-            addReadReadyListener(new ReadReadyListener() {
-
-                public void onReadReady() {
-                    listener.onFlowReady(NetworkSource.this);
-                }
-
-            });
-        }
-
-        public boolean pollingDispatch() {
-
-            Message m = pollNextMessage();
-            if (m != null) {
-                dispatch(m, getFlowController(m.getFlow()));
-                return true;
-            }
-            return false;
-        }
-
-        // Called by FlowController.add()....
-        public void flowElemAccepted(final ISourceController<Message> controller, final Message elem) {
-            messageReceived(elem, controller);
-            controller.elementDispatched(elem);
-        }
-
-        public IFlowSink<Message> getFlowSink() {
-            // No sink, this is a source only:
-            return null;
-        }
-
-        public IFlowSource<Message> getFlowSource() {
-            return this;
-        }
-
-        public boolean isDispatchReady() {
-            return true;
-        }
-    }
-
-    public class FlowMessage extends Message {
-        int size;
-        int count;
-
-        FlowMessage(Flow flow, int size, int count) {
-            super(0, 0, null, flow, null, 0);
-            this.size = size;
-            this.count = count;
-        }
-
-        public short type() {
-            return TYPE_FLOW_CONTROL;
-        }
-
-        public boolean isSystem() {
-            return true;
-        }
-
-        public int getSize() {
-            return size;
-        }
-    }
-
-    public class FlowOpen extends Message {
-        FlowOpen(Flow flow) {
-            super(0, 0, null, flow, null, 0);
-        }
-
-        public short type() {
-            return TYPE_FLOW_OPEN;
-        }
-
-        public boolean isSystem() {
-            return true;
-        }
-    }
-
-    public class FlowClose extends Message {
-        FlowClose(Flow flow) {
-            super(0, 0, null, flow, null, 0);
-        }
-
-        public short type() {
-            return TYPE_FLOW_CLOSE;
-        }
-
-        public boolean isSystem() {
-            return true;
-        }
-    }
-
-    public interface ReadReadyListener {
-        public void onReadReady();
-    }
-
-    public String getName() {
-        return name;
-    }
-
-}

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/BrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/BrokerConnection.java?rev=744939&r1=744938&r2=744939&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/BrokerConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/BrokerConnection.java Mon Feb 16 15:32:50 2009
@@ -1,61 +1,5 @@
-/**
- * 
- */
 package org.apache.activemq.flow;
 
+public class BrokerConnection extends RemoteConnection {
 
-class BrokerConnection extends AbstractTestConnection implements MockBroker.DeliveryTarget {
-    private final Pipe<Message> pipe;
-    private final MockBroker local;
-
-    BrokerConnection(MockBroker local, MockBroker remote, Pipe<Message> pipe) {
-        super(local, remote.getName(), null, pipe);
-        this.pipe = pipe;
-        this.local = local;
-    }
-
-    @Override
-    protected Message getNextMessage() throws InterruptedException {
-        return pipe.read();
-    }
-
-    @Override
-    protected void addReadReadyListener(final ReadReadyListener listener) {
-        pipe.setReadReadyListener(new Pipe.ReadReadyListener<Message>() {
-            public void onReadReady(Pipe<Message> pipe) {
-                listener.onReadReady();
-            }
-        });
-    }
-
-    public Message pollNextMessage() {
-        return pipe.poll();
-    }
-
-    @Override
-    protected void messageReceived(Message m, ISourceController<Message> controller) {
-
-        m = new Message(m);
-        m.incrementHopCount();
-
-        local.router.route(controller, m);
-    }
-
-    @Override
-    protected void write(Message m, ISourceController<Message> controller) throws InterruptedException {
-        pipe.write(m);
-    }
-
-    public IFlowSink<Message> getSink() {
-        return output;
-    }
-
-    public boolean match(Message message) {
-        // Avoid loops:
-        if (message.getHopCount() > 0) {
-            return false;
-        }
-
-        return true;
-    }
-}
\ No newline at end of file
+}

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java?rev=744939&r1=744938&r2=744939&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java Mon Feb 16 15:32:50 2009
@@ -1,5 +1,18 @@
 /**
- * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.activemq.flow;
 
@@ -10,6 +23,7 @@
 
 import org.apache.activemq.dispatch.IDispatcher;
 import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.transport.DispatchableTransportServer;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportAcceptListener;
 import org.apache.activemq.transport.TransportFactory;
@@ -100,8 +114,13 @@
         
         transportServer = TransportFactory.bind(new URI(uri));
         transportServer.setAcceptListener(this);
+        if(transportServer instanceof DispatchableTransportServer)
+        {
+        	((DispatchableTransportServer)transportServer).setDispatcher(dispatcher);
+        }
         transportServer.start();
         
+        
         dispatcher.start();
 
         for (MockQueue queue : queues.values()) {
@@ -127,6 +146,7 @@
         connection.setTransport(transport);
         connection.setPriorityLevels(MockBrokerTest.PRIORITY_LEVELS);
         connection.setDispatcher(dispatcher);
+        connections.add(connection);
         try {
             connection.start();
         } catch (Exception e1) {

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java?rev=744939&r1=744938&r2=744939&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java Mon Feb 16 15:32:50 2009
@@ -29,6 +29,7 @@
 import org.apache.activemq.metric.MetricAggregator;
 import org.apache.activemq.metric.Period;
 import org.apache.activemq.queue.Mapper;
+import org.apache.activemq.transport.nio.SelectorManager;
 
 public class MockBrokerTest extends TestCase {
 
@@ -48,10 +49,8 @@
     boolean ptp = false;
 
     // Set to use tcp IO
-    boolean tcp = true;
+    boolean tcp = false;
 
-    // Can be set to BLOCKING, POLLING or ASYNC
-    public final static int DISPATCH_MODE = AbstractTestConnection.ASYNC;
     // Set's the number of threads to use:
     private final int asyncThreadPoolSize = Runtime.getRuntime().availableProcessors();
     boolean usePartitionedQueue = false;
@@ -317,11 +316,9 @@
 
     private void createConnections() throws IOException, URISyntaxException {
 
-        if (DISPATCH_MODE == AbstractTestConnection.ASYNC || DISPATCH_MODE == AbstractTestConnection.POLLING) {
-            dispatcher = new PriorityPooledDispatcher("BrokerDispatcher", asyncThreadPoolSize, Message.MAX_PRIORITY);
-            FlowController.setFlowExecutor(dispatcher.createPriorityExecutor(Message.MAX_PRIORITY));
-        }
-
+        dispatcher = new PriorityPooledDispatcher("BrokerDispatcher", asyncThreadPoolSize, Message.MAX_PRIORITY);
+        FlowController.setFlowExecutor(dispatcher.createPriorityExecutor(Message.MAX_PRIORITY));
+        
         if (multibroker) {
             if( tcp ) {
                 sendBroker = createBroker("SendBroker", "tcp://localhost:10000?wireFormat=proto");
@@ -384,6 +381,7 @@
         consumer.setDestination(destination);
         consumer.setName("consumer"+(i+1));
         consumer.setTotalConsumerRate(totalConsumerRate);
+        consumer.setDispatcher(dispatcher);
         return consumer;
     }
 
@@ -395,6 +393,7 @@
         producer.setDestination(destination);
         producer.setMessageIdGenerator(msgIdGenerator);
         producer.setTotalProducerRate(totalProducerRate);
+        producer.setDispatcher(dispatcher);
         return producer;
     }
 
@@ -431,6 +430,7 @@
         for (MockBroker broker : brokers) {
             broker.startServices();
         }
+        SelectorManager.SINGLETON.setChannelExecutor(dispatcher.createPriorityExecutor(PRIORITY_LEVELS));
     }
 
 }