You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:24:44 UTC

svn commit: r961062 [3/14] - in /activemq/sandbox/activemq-apollo-actor: ./ activemq-all/ activemq-all/src/test/ide-resources/ activemq-all/src/test/java/org/apache/activemq/jaxb/ activemq-all/src/test/java/org/apache/activemq/legacy/ activemq-all/src/...

Modified: activemq/sandbox/activemq-apollo-actor/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Wed Jul  7 03:24:02 2010
@@ -16,309 +16,135 @@
  */
 package org.apache.activemq.transport.tcp;
 
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import javax.net.ServerSocketFactory;
-
-import org.apache.activemq.Service;
-//import org.apache.activemq.ThreadPriorities;
 import org.apache.activemq.transport.Transport;
-//import org.apache.activemq.transport.TransportLoggerFactory;
+import org.apache.activemq.transport.TransportAcceptListener;
 import org.apache.activemq.transport.TransportServer;
-import org.apache.activemq.transport.TransportServerThreadSupport;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IntrospectionSupport;
-import org.apache.activemq.util.ServiceListener;
-import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.util.ServiceSupport;
 import org.apache.activemq.wireformat.WireFormat;
 import org.apache.activemq.wireformat.WireFormatFactory;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.DispatchSource;
+
+import java.io.IOException;
+import java.net.*;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * A TCP based implementation of {@link TransportServer}
- * 
- * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
- * @version $Revision: 1.1 $
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 
-public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener{
+public class TcpTransportServer implements TransportServer {
 
-    private static final Log LOG = LogFactory.getLog(TcpTransportServer.class);
-    protected ServerSocket serverSocket;
-    protected int backlog = 5000;
     protected WireFormatFactory wireFormatFactory;
-    protected final TcpTransportFactory transportFactory;
-    protected long maxInactivityDuration = 30000;
-    protected long maxInactivityDurationInitalDelay = 10000;
-    protected int minmumWireFormatVersion;
-    protected boolean useQueueForAccept=true;
-       
-    /**
-     * trace=true -> the Transport stack where this TcpTransport
-     * object will be, will have a TransportLogger layer
-     * trace=false -> the Transport stack where this TcpTransport
-     * object will be, will NOT have a TransportLogger layer, and therefore
-     * will never be able to print logging messages.
-     * This parameter is most probably set in Connection or TransportConnector URIs.
-     */
-    protected boolean trace = false;
-
-    protected int soTimeout = 0;
-    protected int socketBufferSize = 64 * 1024;
-    protected int connectionTimeout =  30000;
+    private ServerSocketChannel channel;
+    private TransportAcceptListener listener;
+    private URI bindURI;
+    private URI connectURI;
+    private DispatchQueue dispatchQueue;
+    private DispatchSource acceptSource;
+    private int backlog = 500;
+    private Map<String, Object> transportOptions;
 
-    /**
-     * Name of the LogWriter implementation to use.
-     * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
-     * This parameter is most probably set in Connection or TransportConnector URIs.
-     
-    protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
-    */
-    
-    /**
-     * Specifies if the TransportLogger will be manageable by JMX or not.
-     * Also, as long as there is at least 1 TransportLogger which is manageable,
-     * a TransportLoggerControl MBean will me created.
-     */
-    protected boolean dynamicManagement = false;
-    /**
-     * startLogging=true -> the TransportLogger object of the Transport stack
-     * will initially write messages to the log.
-     * startLogging=false -> the TransportLogger object of the Transport stack
-     * will initially NOT write messages to the log.
-     * This parameter only has an effect if trace == true.
-     * This parameter is most probably set in Connection or TransportConnector URIs.
-     */
-    protected boolean startLogging = true;
-    protected Map<String, Object> transportOptions;
-    protected final ServerSocketFactory serverSocketFactory;
-    protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
-    protected Thread socketHandlerThread;
-    /**
-     * The maximum number of sockets allowed for this server
-     */
-    protected int maximumConnections = Integer.MAX_VALUE;
-    protected int currentTransportCount=0;
-  
-    public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
-        super(location);
-        this.transportFactory = transportFactory;
-        this.serverSocketFactory = serverSocketFactory;
-        
+    public TcpTransportServer(URI location) {
+        this.bindURI = location;
     }
 
-    public void bind() throws IOException {
-        URI bind = getBindLocation();
-
-        String host = bind.getHost();
-        host = (host == null || host.length() == 0) ? "localhost" : host;
-        InetAddress addr = InetAddress.getByName(host);
-
-        try {
-
-            this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
-            configureServerSocket(this.serverSocket);
-            
-        } catch (IOException e) {
-            throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
-        }
-        try {
-            setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind
-                .getFragment()));
-        } catch (URISyntaxException e) {
-
-            // it could be that the host name contains invalid characters such
-            // as _ on unix platforms
-            // so lets try use the IP address instead
-            try {
-                setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind.getFragment()));
-            } catch (URISyntaxException e2) {
-                throw IOExceptionSupport.create(e2);
-            }
-        }
+    public void setAcceptListener(TransportAcceptListener listener) {
+        this.listener = listener;
     }
 
-    private void configureServerSocket(ServerSocket socket) throws SocketException {
-        socket.setSoTimeout(2000);
-        if (transportOptions != null) {
-            IntrospectionSupport.setProperties(socket, transportOptions);
-        }
+    public URI getConnectURI() {
+        return connectURI;
     }
 
-    /**
-     * @return Returns the wireFormatFactory.
-     */
-    public WireFormatFactory getWireFormatFactory() {
-        return wireFormatFactory;
-    }
-
-    /**
-     * @param wireFormatFactory The wireFormatFactory to set.
-     */
-    public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
-        this.wireFormatFactory = wireFormatFactory;
-    }
-
-    public long getMaxInactivityDuration() {
-        return maxInactivityDuration;
-    }
-
-    public void setMaxInactivityDuration(long maxInactivityDuration) {
-        this.maxInactivityDuration = maxInactivityDuration;
-    }
-    
-    public long getMaxInactivityDurationInitalDelay() {
-        return this.maxInactivityDurationInitalDelay;
-    }
-
-    public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) {
-        this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
-    }
-
-    public int getMinmumWireFormatVersion() {
-        return minmumWireFormatVersion;
-    }
-
-    public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
-        this.minmumWireFormatVersion = minmumWireFormatVersion;
-    }
-
-    public boolean isTrace() {
-        return trace;
+    public InetSocketAddress getSocketAddress() {
+        return (InetSocketAddress) channel.socket().getLocalSocketAddress();
     }
 
-    public void setTrace(boolean trace) {
-        this.trace = trace;
+    public DispatchQueue getDispatchQueue() {
+        return dispatchQueue;
     }
-//    
-//    public String getLogWriterName() {
-//        return logWriterName;
-//    }
-//
-//    public void setLogWriterName(String logFormat) {
-//        this.logWriterName = logFormat;
-//    }        
 
-    public boolean isDynamicManagement() {
-        return dynamicManagement;
+    public void setDispatchQueue(DispatchQueue dispatchQueue) {
+        this.dispatchQueue = dispatchQueue;
     }
 
-    public void setDynamicManagement(boolean useJmx) {
-        this.dynamicManagement = useJmx;
+    public void suspend() {
+        acceptSource.resume();
     }
 
-    public boolean isStartLogging() {
-        return startLogging;
+    public void resume() {
+        acceptSource.resume();
     }
 
-
-    public void setStartLogging(boolean startLogging) {
-        this.startLogging = startLogging;
-    }
-    
-    /**
-     * @return the backlog
-     */
-    public int getBacklog() {
-        return backlog;
+    public void start() throws IOException {
+        bind();
+        acceptSource = Dispatch.createSource(channel, SelectionKey.OP_ACCEPT, dispatchQueue);
+        acceptSource.setEventHandler(new Runnable() {
+            public void run() {
+                try {
+                    SocketChannel client = channel.accept();
+                    handleSocket(client);
+                } catch (IOException e) {
+                    listener.onAcceptError(e);
+                }
+            }
+        });
+        acceptSource.setCancelHandler(new Runnable() {
+            public void run() {
+                try {
+                    channel.close();
+                } catch (IOException e) {
+                }
+            }
+        });
+        acceptSource.resume();
     }
 
-    /**
-     * @param backlog the backlog to set
-     */
-    public void setBacklog(int backlog) {
-        this.backlog = backlog;
-    }
+    public void bind() throws IOException {
+        URI bind = bindURI;
 
-    /**
-     * @return the useQueueForAccept
-     */
-    public boolean isUseQueueForAccept() {
-        return useQueueForAccept;
-    }
+        String host = bind.getHost();
+        host = (host == null || host.length() == 0) ? "localhost" : host;
+        if (host.equals("localhost")) {
+            host = "0.0.0.0";
+        }
 
-    /**
-     * @param useQueueForAccept the useQueueForAccept to set
-     */
-    public void setUseQueueForAccept(boolean useQueueForAccept) {
-        this.useQueueForAccept = useQueueForAccept;
-    }
-    
+        InetAddress addr = InetAddress.getByName(host);
+        try {
+            channel = ServerSocketChannel.open();
+            channel.socket().bind(new InetSocketAddress(addr, bind.getPort()), backlog);
+        } catch (IOException e) {
+            throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
+        }
 
-    /**
-     * pull Sockets from the ServerSocket
-     */
-    public void run() {
-        while (!isStopped()) {
-            Socket socket = null;
+        try {
+            connectURI = connectURI(resolveHostName(channel.socket(), addr));
+        } catch (URISyntaxException e) {
+            // it could be that the host name contains invalid characters such
+            // as _ on unix platforms
+            // so lets try use the IP address instead
             try {
-                socket = serverSocket.accept();
-                if (socket != null) {
-                    if (isStopped() || getAcceptListener() == null) {
-                        socket.close();
-                    } else {
-                        if (useQueueForAccept) {
-                            socketQueue.put(socket);
-                        }else {
-                            handleSocket(socket);
-                        }
-                    }
-                }
-            } catch (SocketTimeoutException ste) {
-                // expect this to happen
-            } catch (Exception e) {
-                if (!isStopping()) {
-                    onAcceptError(e);
-                } else if (!isStopped()) {
-                    LOG.warn("run()", e);
-                    onAcceptError(e);
-                }
+                connectURI = connectURI(addr.getHostAddress());
+            } catch (URISyntaxException e2) {
+                throw IOExceptionSupport.create(e2);
             }
         }
     }
 
-    /**
-     * Allow derived classes to override the Transport implementation that this
-     * transport server creates.
-     * 
-     * @param socket
-     * @param format
-     * @return
-     * @throws IOException
-     */
-    protected  Transport createTransport(Socket socket, WireFormat format) throws IOException {
-        return new TcpTransport(format, socket);
-    }
-
-    /**
-     * @return pretty print of this
-     */
-    public String toString() {
-        return "" + getBindLocation();
+    private URI connectURI(String hostname) throws URISyntaxException {
+        return new URI(bindURI.getScheme(), bindURI.getUserInfo(), hostname, channel.socket().getLocalPort(), bindURI.getPath(), bindURI.getQuery(), bindURI.getFragment());
     }
 
-    /**
-     * @param socket 
-     * @param inetAddress
-     * @return real hostName
-     * @throws UnknownHostException
-     */
     protected String resolveHostName(ServerSocket socket, InetAddress bindAddress) throws UnknownHostException {
         String result = null;
         if (socket.isBound()) {
@@ -333,144 +159,426 @@ public class TcpTransportServer extends 
         }
         return result;
     }
-    
-    protected void doStart() throws Exception {
-        if(useQueueForAccept) {
-            Runnable run = new Runnable() {
-                public void run() {
-                    try {
-                        while (!isStopped() && !isStopping()) {
-                            Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
-                            if (sock != null) {
-                                handleSocket(sock);
-                            }
-                        }
-    
-                    } catch (InterruptedException e) {
-                        LOG.info("socketQueue interuppted - stopping");
-                        if (!isStopping()) {
-                            onAcceptError(e);
-                        }
-                    }
-    
-                }
-    
-            };
-            socketHandlerThread = new Thread(null, run,
-                    "ActiveMQ Transport Server Thread Handler: " + toString(),
-                    getStackSize());
-            socketHandlerThread.setDaemon(true);
-            //socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
-            socketHandlerThread.start();
-        }
-        super.doStart();
-        
-    }
 
-    protected void doStop(ServiceStopper stopper) throws Exception {
-        super.doStop(stopper);
-        if (serverSocket != null) {
-            serverSocket.close();
-        }
+    public void stop() throws Exception {
+        acceptSource.release();
     }
 
-    public InetSocketAddress getSocketAddress() {
-        return (InetSocketAddress)serverSocket.getLocalSocketAddress();
+    public WireFormatFactory getWireFormatFactory() {
+        return wireFormatFactory;
     }
 
-    public void setTransportOption(Map<String, Object> transportOptions) {
-        this.transportOptions = transportOptions;
+    public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
+        this.wireFormatFactory = wireFormatFactory;
     }
-    
-    protected final void handleSocket(Socket socket) {
-        try {
-            if (this.currentTransportCount >= this.maximumConnections) {
-                
-            }else {
-            HashMap<String, Object> options = new HashMap<String, Object>();
-            options.put("maxInactivityDuration", Long
-                    .valueOf(maxInactivityDuration));
-            options.put("maxInactivityDurationInitalDelay", Long
-                    .valueOf(maxInactivityDurationInitalDelay));            
-            options.put("minmumWireFormatVersion", Integer
-                    .valueOf(minmumWireFormatVersion));
-            options.put("trace", Boolean.valueOf(trace));
-            options.put("soTimeout", Integer.valueOf(soTimeout));
-            options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
-            options.put("connectionTimeout", Integer.valueOf(connectionTimeout));
-//            options.put("logWriterName", logWriterName);
-            options
-                    .put("dynamicManagement", Boolean
-                            .valueOf(dynamicManagement));
-            options.put("startLogging", Boolean.valueOf(startLogging));
-
-            options.putAll(transportOptions);
-            WireFormat format = wireFormatFactory.createWireFormat();
-            Transport transport = createTransport(socket, format);
-            if (transport instanceof ServiceSupport) {
-                ((ServiceSupport) transport).addServiceListener(this);
-            }
-            Transport configuredTransport = transportFactory.serverConfigure(
-                    transport, format, options);
-            getAcceptListener().onAccept(configuredTransport);
-            }
-        } catch (SocketTimeoutException ste) {
-            // expect this to happen
-        } catch (Exception e) {
-            if (!isStopping()) {
-                onAcceptError(e);
-            } else if (!isStopped()) {
-                LOG.warn("run()", e);
-                onAcceptError(e);
-            }
-        }
-        
-    }    
 
-    public int getSoTimeout() {
-        return soTimeout;
+    public URI getBindURI() {
+        return bindURI;
     }
 
-    public void setSoTimeout(int soTimeout) {
-        this.soTimeout = soTimeout;
+    public void setBindURI(URI bindURI) {
+        this.bindURI = bindURI;
     }
 
-    public int getSocketBufferSize() {
-        return socketBufferSize;
+    public int getBacklog() {
+        return backlog;
     }
 
-    public void setSocketBufferSize(int socketBufferSize) {
-        this.socketBufferSize = socketBufferSize;
+    public void setBacklog(int backlog) {
+        this.backlog = backlog;
     }
 
-    public int getConnectionTimeout() {
-        return connectionTimeout;
+    protected final void handleSocket(SocketChannel socket) throws IOException {
+        HashMap<String, Object> options = new HashMap<String, Object>();
+//      options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
+//      options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay));
+//      options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion));
+//      options.put("trace", Boolean.valueOf(trace));
+//      options.put("soTimeout", Integer.valueOf(soTimeout));
+//      options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
+//      options.put("connectionTimeout", Integer.valueOf(connectionTimeout));
+//      options.put("dynamicManagement", Boolean.valueOf(dynamicManagement));
+//      options.put("startLogging", Boolean.valueOf(startLogging));
+
+        Transport transport = createTransport(socket, options);
+        transport.setWireformat(wireFormatFactory.createWireFormat());
+        listener.onAccept(transport);
+    }
+
+    private Transport createTransport(SocketChannel socketChannel, HashMap<String, Object> options) throws IOException {
+        TcpTransport transport = new TcpTransport();
+        transport.connected(socketChannel);
+        if( options!=null ) {
+            IntrospectionSupport.setProperties(transport, options);
+        }
+        if (transportOptions != null) {
+            IntrospectionSupport.setProperties(transport, transportOptions);
+        }
+        return transport;
     }
 
-    public void setConnectionTimeout(int connectionTimeout) {
-        this.connectionTimeout = connectionTimeout;
+    public void setTransportOption(Map<String, Object> transportOptions) {
+        this.transportOptions = transportOptions;
     }
 
-    /**
-     * @return the maximumConnections
-     */
-    public int getMaximumConnections() {
-        return maximumConnections;
-    }
+
+
+//    private static final Log LOG = LogFactory.getLog(TcpTransportServer.class);
+//    protected ServerSocket serverSocket;
+//    protected int backlog = 5000;
+//    protected WireFormatFactory wireFormatFactory;
+//    protected final TcpTransportFactory transportFactory;
+//    protected long maxInactivityDuration = 30000;
+//    protected long maxInactivityDurationInitalDelay = 10000;
+//    protected int minmumWireFormatVersion;
+//    protected boolean useQueueForAccept=true;
+//
+//    /**
+//     * trace=true -> the Transport stack where this TcpTransport
+//     * object will be, will have a TransportLogger layer
+//     * trace=false -> the Transport stack where this TcpTransport
+//     * object will be, will NOT have a TransportLogger layer, and therefore
+//     * will never be able to print logging messages.
+//     * This parameter is most probably set in Connection or TransportConnector URIs.
+//     */
+//    protected boolean trace = false;
+//
+//    protected int soTimeout = 0;
+//    protected int socketBufferSize = 64 * 1024;
+//    protected int connectionTimeout =  30000;
+//
+//    /**
+//     * Name of the LogWriter implementation to use.
+//     * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
+//     * This parameter is most probably set in Connection or TransportConnector URIs.
+//
+//    protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
+//    */
+//
+//    /**
+//     * Specifies if the TransportLogger will be manageable by JMX or not.
+//     * Also, as long as there is at least 1 TransportLogger which is manageable,
+//     * a TransportLoggerControl MBean will me created.
+//     */
+//    protected boolean dynamicManagement = false;
+//    /**
+//     * startLogging=true -> the TransportLogger object of the Transport stack
+//     * will initially write messages to the log.
+//     * startLogging=false -> the TransportLogger object of the Transport stack
+//     * will initially NOT write messages to the log.
+//     * This parameter only has an effect if trace == true.
+//     * This parameter is most probably set in Connection or TransportConnector URIs.
+//     */
+//    protected boolean startLogging = true;
+//    protected Map<String, Object> transportOptions;
+//    protected final ServerSocketFactory serverSocketFactory;
+//    protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
+//    protected Thread socketHandlerThread;
+//    /**
+//     * The maximum number of sockets allowed for this server
+//     */
+//    protected int maximumConnections = Integer.MAX_VALUE;
+//    protected int currentTransportCount=0;
+//
+//    public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+//        super(location);
+//        this.transportFactory = transportFactory;
+//        this.serverSocketFactory = serverSocketFactory;
+//
+//    }
+//
+//    public void bind() throws IOException {
+//        URI bind = getBindLocation();
+//
+//        String host = bind.getHost();
+//        host = (host == null || host.length() == 0) ? "localhost" : host;
+//        InetAddress addr = InetAddress.getByName(host);
+//
+//        try {
+//
+//            this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
+//            configureServerSocket(this.serverSocket);
+//
+//        } catch (IOException e) {
+//            throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
+//        }
+//        try {
+//            setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind
+//                .getFragment()));
+//        } catch (URISyntaxException e) {
+//
+//            // it could be that the host name contains invalid characters such
+//            // as _ on unix platforms
+//            // so lets try use the IP address instead
+//            try {
+//                setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind.getFragment()));
+//            } catch (URISyntaxException e2) {
+//                throw IOExceptionSupport.create(e2);
+//            }
+//        }
+//    }
+//
+//    private void configureServerSocket(ServerSocket socket) throws SocketException {
+//        socket.setSoTimeout(2000);
+//        if (transportOptions != null) {
+//            IntrospectionSupport.setProperties(socket, transportOptions);
+//        }
+//    }
+//
+//    /**
+//     * @return Returns the wireFormatFactory.
+//     */
+//    public WireFormatFactory getWireFormatFactory() {
+//        return wireFormatFactory;
+//    }
+//
+//    /**
+//     * @param wireFormatFactory The wireFormatFactory to set.
+//     */
+//    public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
+//        this.wireFormatFactory = wireFormatFactory;
+//    }
+//
+//    public long getMaxInactivityDuration() {
+//        return maxInactivityDuration;
+//    }
+//
+//    public void setMaxInactivityDuration(long maxInactivityDuration) {
+//        this.maxInactivityDuration = maxInactivityDuration;
+//    }
+//
+//    public long getMaxInactivityDurationInitalDelay() {
+//        return this.maxInactivityDurationInitalDelay;
+//    }
+//
+//    public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) {
+//        this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
+//    }
+//
+//    public int getMinmumWireFormatVersion() {
+//        return minmumWireFormatVersion;
+//    }
+//
+//    public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
+//        this.minmumWireFormatVersion = minmumWireFormatVersion;
+//    }
+//
+//    public boolean isTrace() {
+//        return trace;
+//    }
+//
+//    public void setTrace(boolean trace) {
+//        this.trace = trace;
+//    }
+////
+////    public String getLogWriterName() {
+////        return logWriterName;
+////    }
+////
+////    public void setLogWriterName(String logFormat) {
+////        this.logWriterName = logFormat;
+////    }
+//
+//    public boolean isDynamicManagement() {
+//        return dynamicManagement;
+//    }
+//
+//    public void setDynamicManagement(boolean useJmx) {
+//        this.dynamicManagement = useJmx;
+//    }
+//
+//    public boolean isStartLogging() {
+//        return startLogging;
+//    }
+//
+//
+//    public void setStartLogging(boolean startLogging) {
+//        this.startLogging = startLogging;
+//    }
+//
+//    /**
+//     * @return the backlog
+//     */
+//    public int getBacklog() {
+//        return backlog;
+//    }
+//
+//    /**
+//     * @param backlog the backlog to set
+//     */
+//    public void setBacklog(int backlog) {
+//        this.backlog = backlog;
+//    }
+//
+//    /**
+//     * @return the useQueueForAccept
+//     */
+//    public boolean isUseQueueForAccept() {
+//        return useQueueForAccept;
+//    }
+//
+//    /**
+//     * @param useQueueForAccept the useQueueForAccept to set
+//     */
+//    public void setUseQueueForAccept(boolean useQueueForAccept) {
+//        this.useQueueForAccept = useQueueForAccept;
+//    }
+//
+//
+//    /**
+//     * pull Sockets from the ServerSocket
+//     */
+//    public void run() {
+//        while (!isStopped()) {
+//            Socket socket = null;
+//            try {
+//                socket = serverSocket.accept();
+//                if (socket != null) {
+//                    if (isStopped() || getAcceptListener() == null) {
+//                        socket.close();
+//                    } else {
+//                        if (useQueueForAccept) {
+//                            socketQueue.put(socket);
+//                        }else {
+//                            handleSocket(socket);
+//                        }
+//                    }
+//                }
+//            } catch (SocketTimeoutException ste) {
+//                // expect this to happen
+//            } catch (Exception e) {
+//                if (!isStopping()) {
+//                    onAcceptError(e);
+//                } else if (!isStopped()) {
+//                    LOG.warn("run()", e);
+//                    onAcceptError(e);
+//                }
+//            }
+//        }
+//    }
+//
+//    /**
+//     * Allow derived classes to override the Transport implementation that this
+//     * transport server creates.
+//     *
+//     * @param socket
+//     * @param format
+//     * @return
+//     * @throws IOException
+//     */
+//    protected  Transport createTransport(Socket socket, WireFormat format) throws IOException {
+//        return new TcpTransport(format, socket);
+//    }
+//
 
     /**
-     * @param maximumConnections the maximumConnections to set
+     * @return pretty print of this
      */
-    public void setMaximumConnections(int maximumConnections) {
-        this.maximumConnections = maximumConnections;
-    }
-
-    
-    public void started(Service service) {
-       this.currentTransportCount++;
+    public String toString() {
+        return "" + bindURI;
     }
+//
+//    /**
+//     * @param socket
+//     * @param inetAddress
+//     * @return real hostName
+//     * @throws UnknownHostException
+//     */
+//
+//    protected void doStart() throws Exception {
+//        if(useQueueForAccept) {
+//            Runnable run = new Runnable() {
+//                public void run() {
+//                    try {
+//                        while (!isStopped() && !isStopping()) {
+//                            Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
+//                            if (sock != null) {
+//                                handleSocket(sock);
+//                            }
+//                        }
+//
+//                    } catch (InterruptedException e) {
+//                        LOG.info("socketQueue interuppted - stopping");
+//                        if (!isStopping()) {
+//                            onAcceptError(e);
+//                        }
+//                    }
+//
+//                }
+//
+//            };
+//            socketHandlerThread = new Thread(null, run,
+//                    "ActiveMQ Transport Server Thread Handler: " + toString(),
+//                    getStackSize());
+//            socketHandlerThread.setDaemon(true);
+//            //socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
+//            socketHandlerThread.start();
+//        }
+//        super.doStart();
+//
+//    }
+//
+//    protected void doStop(ServiceStopper stopper) throws Exception {
+//        super.doStop(stopper);
+//        if (serverSocket != null) {
+//            serverSocket.close();
+//        }
+//    }
+//
+//    public InetSocketAddress getSocketAddress() {
+//        return (InetSocketAddress)serverSocket.getLocalSocketAddress();
+//    }
+//
+//    public void setTransportOption(Map<String, Object> transportOptions) {
+//        this.transportOptions = transportOptions;
+//    }
+//
+//
+//    public int getSoTimeout() {
+//        return soTimeout;
+//    }
+//
+//    public void setSoTimeout(int soTimeout) {
+//        this.soTimeout = soTimeout;
+//    }
+//
+//    public int getSocketBufferSize() {
+//        return socketBufferSize;
+//    }
+//
+//    public void setSocketBufferSize(int socketBufferSize) {
+//        this.socketBufferSize = socketBufferSize;
+//    }
+//
+//    public int getConnectionTimeout() {
+//        return connectionTimeout;
+//    }
+//
+//    public void setConnectionTimeout(int connectionTimeout) {
+//        this.connectionTimeout = connectionTimeout;
+//    }
+//
+//    /**
+//     * @return the maximumConnections
+//     */
+//    public int getMaximumConnections() {
+//        return maximumConnections;
+//    }
+//
+//    /**
+//     * @param maximumConnections the maximumConnections to set
+//     */
+//    public void setMaximumConnections(int maximumConnections) {
+//        this.maximumConnections = maximumConnections;
+//    }
+//
+//
+//    public void started(Service service) {
+//       this.currentTransportCount++;
+//    }
+//
+//    public void stopped(Service service) {
+//        this.currentTransportCount--;
+//    }
 
-    public void stopped(Service service) {
-        this.currentTransportCount--;
-    }
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml Wed Jul  7 03:24:02 2010
@@ -34,6 +34,12 @@
   <dependencies>
   
     <dependency>
+      <groupId>org.fusesource.hawtdispatch</groupId>
+      <artifactId>hawtdispatch</artifactId>
+      <version>${hawtdispatch-version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.activemq</groupId>
       <artifactId>activemq-transport</artifactId>
     </dependency>

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java Wed Jul  7 03:24:02 2010
@@ -19,33 +19,25 @@ package org.apache.activemq.apollo;
 import java.beans.ExceptionListener;
 import java.io.IOException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.Service;
 import org.apache.activemq.dispatch.Dispatcher;
-import org.apache.activemq.transport.DispatchableTransport;
+import org.apache.activemq.transport.CompletionCallback;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportListener;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
 
 abstract public class Connection implements TransportListener, Service {
 
     protected Transport transport;
     protected String name;
 
-    private int priorityLevels;
-    protected int outputWindowSize = 1024 * 1024;
-    protected int outputResumeThreshold = 900 * 1024;
-    protected int inputWindowSize = 1024 * 1024;
-    protected int inputResumeThreshold = 512 * 1024;
-    protected boolean useAsyncWriteThread = true;
-
-    private Dispatcher dispatcher;
-    private final AtomicBoolean stopping = new AtomicBoolean();
-    private ExecutorService blockingWriter;
-    private ExceptionListener exceptionListener;
+    protected DispatchQueue dispatchQueue = Dispatch.createQueue();
+    protected boolean stopping;
+    protected ExceptionListener exceptionListener;
 
     public void setTransport(Transport transport) {
         this.transport = transport;
@@ -53,44 +45,20 @@ abstract public class Connection impleme
 
     public void start() throws Exception {
         transport.setTransportListener(this);
-        
-        if (transport instanceof DispatchableTransport) {
-            DispatchableTransport dt = ((DispatchableTransport) transport);
-            if (name != null) {
-                dt.setName(name);
-            }
-            dt.setDispatcher(getDispatcher());
-        } else {
-            if (useAsyncWriteThread) {
-                blockingWriter = Executors.newSingleThreadExecutor(new ThreadFactory() {
-                    public Thread newThread(Runnable r) {
-                        return new Thread(r, "Writer-" + name);
-                    }
-                });
-            }
-        }
+        transport.setDispatchQueue(dispatchQueue);
         transport.start();
     }
 
     public void stop() throws Exception {
-        stopping.set(true);
+        stopping = true;
         if (transport != null) {
             transport.stop();
         }
-        if (blockingWriter != null) {
-            blockingWriter.shutdown();
-        }
+        dispatchQueue.release();
     }
 
     public void setName(String name) {
         this.name = name;
-        if (blockingWriter != null) {
-            blockingWriter.execute(new Runnable() {
-                public void run() {
-                    Thread.currentThread().setName("Writer-" + Connection.this.name);
-                }
-            });
-        }
     }
 
     public String getName() {
@@ -104,36 +72,8 @@ abstract public class Connection impleme
         write(o, null);
     }
 
-    public final void write(final Object o, final Runnable onCompleted) {
-        if (blockingWriter == null) {
-            try {
-                transport.oneway(o);
-                if (onCompleted != null) {
-                    onCompleted.run();
-                }
-            } catch (IOException e) {
-                onException(e);
-            }
-        } else {
-            try {
-                blockingWriter.execute(new Runnable() {
-                    public void run() {
-                        if (!stopping.get()) {
-                            try {
-                                transport.oneway(o);
-                                if (onCompleted != null) {
-                                    onCompleted.run();
-                                }
-                            } catch (IOException e) {
-                                onException(e);
-                            }
-                        }
-                    }
-                });
-            } catch (RejectedExecutionException re) {
-                // Must be shutting down.
-            }
-        }
+    public final void write(final Object o, final CompletionCallback callback) {
+        transport.oneway(o, callback);
     }
 
     final public void onException(IOException error) {
@@ -149,49 +89,21 @@ abstract public class Connection impleme
     }
 
     public void setStopping() {
-        stopping.set(true);
+        stopping = true;
     }
     
     public boolean isStopping() {
-        return stopping.get();
-    }
-
-    public void transportInterupted() {
-    }
-
-    public void transportResumed() {
-    }
-
-    public int getPriorityLevels() {
-        return priorityLevels;
-    }
-
-    public void setPriorityLevels(int priorityLevels) {
-        this.priorityLevels = priorityLevels;
-    }
-
-    public Dispatcher getDispatcher() {
-        return dispatcher;
+        return stopping;
     }
 
-    public void setDispatcher(Dispatcher dispatcher) {
-        this.dispatcher = dispatcher;
+    public void onDisconnected() {
     }
 
-    public int getOutputWindowSize() {
-        return outputWindowSize;
+    public void onConnected() {
     }
 
-    public int getOutputResumeThreshold() {
-        return outputResumeThreshold;
-    }
-
-    public int getInputWindowSize() {
-        return inputWindowSize;
-    }
-
-    public int getInputResumeThreshold() {
-        return inputResumeThreshold;
+    public DispatchQueue getDispatchQueue() {
+        return dispatchQueue;
     }
 
     public Transport getTransport() {
@@ -206,12 +118,4 @@ abstract public class Connection impleme
         this.exceptionListener = exceptionListener;
     }
 
-    public boolean isUseAsyncWriteThread() {
-        return useAsyncWriteThread;
-    }
-
-    public void setUseAsyncWriteThread(boolean useAsyncWriteThread) {
-        this.useAsyncWriteThread = useAsyncWriteThread;
-    }
-
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java Wed Jul  7 03:24:02 2010
@@ -25,9 +25,6 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.activemq.Service;
 import org.apache.activemq.apollo.Connection;
-import org.apache.activemq.dispatch.Dispatcher;
-import org.apache.activemq.dispatch.DispatcherConfig;
-import org.apache.activemq.dispatch.DispatcherAware;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportAcceptListener;
 import org.apache.activemq.transport.TransportServer;
@@ -35,6 +32,8 @@ import org.apache.activemq.util.IOHelper
 import org.apache.activemq.util.buffer.AsciiBuffer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
 
 public class Broker implements Service {
 
@@ -42,16 +41,14 @@ public class Broker implements Service {
 
 	static final private Log LOG = LogFactory.getLog(Broker.class);
 	
-    public static final int MAX_USER_PRIORITY = 10;
-    public static final int MAX_PRIORITY = MAX_USER_PRIORITY + 1;
-
     private final ArrayList<Connection> clientConnections = new ArrayList<Connection>();
     private final ArrayList<TransportServer> transportServers = new ArrayList<TransportServer>();
     private final ArrayList<String> connectUris = new ArrayList<String>();
 
     private final LinkedHashMap<AsciiBuffer, VirtualHost> virtualHosts = new LinkedHashMap<AsciiBuffer, VirtualHost>();
     private VirtualHost defaultVirtualHost;
-    private Dispatcher dispatcher;
+
+    private DispatchQueue dispatchQueue = Dispatch.createQueue("broker");
     private File dataDirectory;
 
     private final class BrokerAcceptListener implements TransportAcceptListener {
@@ -59,8 +56,6 @@ public class Broker implements Service {
 		    BrokerConnection connection = new BrokerConnection();
 		    connection.setBroker(Broker.this);
 		    connection.setTransport(transport);
-		    connection.setPriorityLevels(MAX_PRIORITY);
-		    connection.setDispatcher(dispatcher);
 		    clientConnections.add(connection);
 		    try {
 		        connection.start();
@@ -125,21 +120,12 @@ public class Broker implements Service {
 		// Create the default virtual host if not explicitly defined.
 		getDefaultVirtualHost();
 
-		// Don't change the state to STARTING yet as we may need to 
-		// apply some default configuration to this broker instance before it's started.
-		if( dispatcher == null ) {
-			int threads = Runtime.getRuntime().availableProcessors();
-			dispatcher = DispatcherConfig.create(getName(), threads);
-		}
-		
 
 	    // Ok now we are ready to start the broker up....
 		if ( !state.compareAndSet(State.CONFIGURATION, State.STARTING) ) {
     		throw new IllegalStateException("Can only start a broker that is in the "+State.CONFIGURATION +" state.  Broker was "+state.get());
     	}
     	try {
-		    dispatcher.resume();
-
 	    	synchronized(virtualHosts) {
 			    for (VirtualHost virtualHost : virtualHosts.values()) {
 			    	virtualHost.setBroker(this);
@@ -184,8 +170,6 @@ public class Broker implements Service {
         for (VirtualHost virtualHost : virtualHosts.values()) {
         	stop(virtualHost);
         }
-        
-        dispatcher.release();
     	state.set(State.STOPPED);
     }
         
@@ -374,17 +358,6 @@ public class Broker implements Service {
     }
 
     // /////////////////////////////////////////////////////////////////
-    // Property Accessors
-    // /////////////////////////////////////////////////////////////////
-    public Dispatcher getDispatcher() {
-        return dispatcher;
-    }
-    public void setDispatcher(Dispatcher dispatcher) {
-    	assertInConfigurationState();
-        this.dispatcher = dispatcher;
-    }
-
-    // /////////////////////////////////////////////////////////////////
     // Helper Methods
     // /////////////////////////////////////////////////////////////////
 
@@ -416,10 +389,8 @@ public class Broker implements Service {
 	}
     
 	private void startTransportServer(TransportServer server) throws Exception {
+        server.setDispatchQueue(dispatchQueue);
 		server.setAcceptListener(new BrokerAcceptListener());
-		if (server instanceof DispatcherAware ) {
-			((DispatcherAware) server).setDispatcher(dispatcher);
-		}
 		server.start();
 	}
 
@@ -449,6 +420,9 @@ public class Broker implements Service {
 		}
 	}
 
+    public DispatchQueue getDispatchQueue() {
+        return dispatchQueue;
+    }
 
 
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java Wed Jul  7 03:24:02 2010
@@ -16,40 +16,12 @@
  */
 package org.apache.activemq.apollo.broker;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.activemq.Service;
+import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.broker.store.Store;
-import org.apache.activemq.broker.store.Store.Callback;
-import org.apache.activemq.broker.store.Store.FatalStoreException;
-import org.apache.activemq.broker.store.Store.KeyNotFoundException;
-import org.apache.activemq.broker.store.Store.MessageRecord;
-import org.apache.activemq.broker.store.Store.QueueQueryResult;
-import org.apache.activemq.broker.store.Store.QueueRecord;
-import org.apache.activemq.broker.store.Store.Session;
-import org.apache.activemq.dispatch.Dispatcher;
-import org.apache.activemq.dispatch.DispatcherAware;
-import org.apache.activemq.flow.AbstractLimitedFlowResource;
-import org.apache.activemq.flow.Flow;
-import org.apache.activemq.flow.FlowController;
-import org.apache.activemq.flow.IFlowResource;
-import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.flow.SizeLimiter;
+import org.apache.activemq.broker.store.Store.*;
+import org.apache.activemq.flow.*;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
-import org.apache.activemq.queue.QueueDescriptor;
 import org.apache.activemq.queue.RestoreListener;
 import org.apache.activemq.queue.RestoredElement;
 import org.apache.activemq.queue.SaveableQueueElement;
@@ -59,8 +31,16 @@ import org.apache.activemq.util.buffer.A
 import org.apache.activemq.util.buffer.Buffer;
 import org.apache.activemq.util.list.LinkedNode;
 import org.apache.activemq.util.list.LinkedNodeList;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
 
-public class BrokerDatabase extends AbstractLimitedFlowResource<BrokerDatabase.OperationBase<?>> implements Service, DispatcherAware {
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class BrokerDatabase extends AbstractLimitedFlowResource<BrokerDatabase.OperationBase<?>> implements Service {
 
     private static final boolean DEBUG = false;
 
@@ -71,7 +51,7 @@ public class BrokerDatabase extends Abst
     private final FlowController<OperationBase<?>> storeController;
     private final int FLUSH_QUEUE_SIZE = 10000 * 1024;
 
-    private Dispatcher dispatcher;
+    private DispatchQueue dispatcher;
     private Thread flushThread;
     private AtomicBoolean running = new AtomicBoolean(false);
     private DatabaseListener listener;
@@ -328,7 +308,7 @@ public class BrokerDatabase extends Abst
 
         if (requestedDelayedFlushPointer == -1) {
             requestedDelayedFlushPointer = delayedFlushPointer;
-            dispatcher.getGlobalQueue().dispatchAfter(flushDelayCallback, flushDelay, TimeUnit.MILLISECONDS);
+            Dispatch.getGlobalQueue().dispatchAfter(flushDelay, TimeUnit.MILLISECONDS, flushDelayCallback);
         }
 
     }
@@ -566,10 +546,7 @@ public class BrokerDatabase extends Abst
     /**
      * Deletes the given message from the store for the given queue.
      * 
-     * @param storeTracking
-     *            The tracking number of the element being deleted
-     * @param queue
-     *            The queue.
+     * @param queueElement
      * @return The {@link OperationContext} associated with the operation
      */
     public OperationContext<?> deleteQueueElement(SaveableQueueElement<?> queueElement) {
@@ -578,7 +555,7 @@ public class BrokerDatabase extends Abst
 
     /**
      * Loads a batch of messages for the specified queue. The loaded messages
-     * are given the provided {@link MessageRestoreListener}.
+     * are given the provided {@link RestoreListener}.
      * <p>
      * <b><i>NOTE:</i></b> This method uses the queue sequence number for the
      * message not the store tracking number.
@@ -592,7 +569,7 @@ public class BrokerDatabase extends Abst
      *            begining)
      * @param maxSequence
      *            The maximum sequence number to load (-1 if no limit)
-     * @param max
+     * @param maxCount
      *            The maximum number of messages to load (-1 if no limit)
      * @param listener
      *            The listener to which messags should be passed.
@@ -632,7 +609,7 @@ public class BrokerDatabase extends Abst
     /**
      * This interface is used to execute transacted code.
      * 
-     * It is used by the {@link Store#execute(Callback)} method, often as
+     * It is used by the {@link Store#execute(org.apache.activemq.broker.store.Store.Callback, Runnable)} method, often as
      * anonymous class.
      */
     public interface Operation<V> extends OperationContext<V> {
@@ -646,8 +623,7 @@ public class BrokerDatabase extends Abst
         public boolean beginExecute();
 
         /**
-         * Gets called by the
-         * {@link Store#add(Operation, ISourceController, boolean)} method
+         * Gets called by the {@link Store}
          * within a transactional context. If any exception is thrown including
          * Runtime exception, the transaction is rolled back.
          * 
@@ -760,7 +736,7 @@ public class BrokerDatabase extends Abst
 
         /**
          * Gets called by the
-         * {@link Store#add(Operation, ISourceController, boolean)} method
+         * {@link Store} method
          * within a transactional context. If any exception is thrown including
          * Runtime exception, the transaction is rolled back.
          * 
@@ -867,7 +843,7 @@ public class BrokerDatabase extends Abst
          * to complete, and then retrieves its result, if available.
          *
          * @param timeout the maximum time to wait
-         * @param unit the time unit of the timeout argument
+         * @param tu the time unit of the timeout argument
          * @return the computed result
          * @throws CancellationException if the computation was cancelled
          * @throws ExecutionException if the computation threw an
@@ -1288,12 +1264,8 @@ public class BrokerDatabase extends Abst
         return store.allocateStoreTracking();
     }
 
-    public Dispatcher getDispatcher() {
-        return dispatcher;
-    }
-
-    public void setDispatcher(Dispatcher dispatcher) {
-        this.dispatcher = dispatcher;
+    public void setDispatchQueue(DispatchQueue queue) {
+        this.dispatcher = queue;
     }
 
     public Store getStore() {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java Wed Jul  7 03:24:02 2010
@@ -22,9 +22,9 @@ import java.util.Collection;
 import java.util.HashMap;
 
 import org.apache.activemq.apollo.broker.BrokerDatabase.OperationContext;
+import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.queue.QueueDescriptor;
 import org.apache.activemq.queue.SaveableQueueElement;
 import org.apache.activemq.util.buffer.AsciiBuffer;
 import org.apache.activemq.util.buffer.Buffer;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java Wed Jul  7 03:24:02 2010
@@ -22,9 +22,9 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 
+import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.broker.store.Store.QueueQueryResult;
-import org.apache.activemq.dispatch.Dispatcher;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.PrioritySizeLimiter;
 import org.apache.activemq.flow.SizeLimiter;
@@ -33,7 +33,6 @@ import org.apache.activemq.queue.IPartit
 import org.apache.activemq.queue.IQueue;
 import org.apache.activemq.queue.PartitionedQueue;
 import org.apache.activemq.queue.PersistencePolicy;
-import org.apache.activemq.queue.QueueDescriptor;
 import org.apache.activemq.queue.QueueStore;
 import org.apache.activemq.queue.RestoreListener;
 import org.apache.activemq.queue.SaveableQueueElement;
@@ -44,6 +43,7 @@ import org.apache.activemq.util.Mapper;
 import org.apache.activemq.util.buffer.AsciiBuffer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.fusesource.hawtdispatch.DispatchQueue;
 
 public class BrokerQueueStore implements QueueStore<Long, MessageDelivery> {
 
@@ -52,7 +52,7 @@ public class BrokerQueueStore implements
     private static final boolean USE_PRIORITY_QUEUES = true;
 
     private BrokerDatabase database;
-    private Dispatcher dispatcher;
+    private DispatchQueue dispatchQueue;
 
     private static HashMap<String, ProtocolHandler> protocolHandlers = new HashMap<String, ProtocolHandler>();
     private static final BrokerDatabase.MessageRecordMarshaller<MessageDelivery> MESSAGE_MARSHALLER = new BrokerDatabase.MessageRecordMarshaller<MessageDelivery>() {
@@ -226,8 +226,8 @@ public class BrokerQueueStore implements
         this.database = database;
     }
 
-    public void setDispatcher(Dispatcher dispatcher) {
-        this.dispatcher = dispatcher;
+    public void setDispatchQueue(DispatchQueue dispatchQueue) {
+        this.dispatchQueue = dispatchQueue;
     }
 
     public void loadQueues() throws Exception {
@@ -380,7 +380,6 @@ public class BrokerQueueStore implements
             }
         };
         queue = new ExclusivePersistentQueue<Long, MessageDelivery>(name, limiter);
-        queue.setDispatcher(dispatcher);
         queue.setStore(this);
         queue.setPersistencePolicy(DURABLE_QUEUE_PERSISTENCE_POLICY);
         queue.setExpirationMapper(EXPIRATION_MAPPER);
@@ -409,7 +408,7 @@ public class BrokerQueueStore implements
             break;
         }
         case QueueDescriptor.SHARED_PRIORITY: {
-            PrioritySizeLimiter<MessageDelivery> limiter = new PrioritySizeLimiter<MessageDelivery>(DEFAULT_SHARED_QUEUE_SIZE, DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD, Broker.MAX_PRIORITY);
+            PrioritySizeLimiter<MessageDelivery> limiter = new PrioritySizeLimiter<MessageDelivery>(DEFAULT_SHARED_QUEUE_SIZE, DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD, 10);
             limiter.setPriorityMapper(PRIORITY_MAPPER);
             limiter.setSizeMapper(SIZE_MAPPER);
             SharedPriorityQueue<Long, MessageDelivery> queue = new SharedPriorityQueue<Long, MessageDelivery>(name, limiter);
@@ -444,7 +443,6 @@ public class BrokerQueueStore implements
         }
         }
         ret.getDescriptor().setApplicationType(SUBPARTITION_TYPE);
-        ret.setDispatcher(dispatcher);
         ret.setStore(this);
         ret.setPersistencePolicy(SHARED_QUEUE_PERSISTENCE_POLICY);
         ret.setExpirationMapper(EXPIRATION_MAPPER);

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java Wed Jul  7 03:24:02 2010
@@ -98,7 +98,6 @@ class TopicSubscription implements Broke
 		String name = subscription.getResourceName();
 		IFlowLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(100, 50);
 		ExclusiveQueue<MessageDelivery> queue = new ExclusiveQueue<MessageDelivery>(flow, name, limiter);
-		queue.setDispatcher(host.getBroker().getDispatcher());
 		queue.setDrain( new QueueDispatchTarget<MessageDelivery>() {
             public void drain(MessageDelivery elem, ISourceController<MessageDelivery> controller) {
                 subscription.add(elem, controller);

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java Wed Jul  7 03:24:02 2010
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.activemq.apollo.broker.BrokerDatabase.OperationContext;
 import org.apache.activemq.apollo.broker.Transaction.TxOp;
+import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.broker.store.Store.QueueQueryResult;
 import org.apache.activemq.flow.ISourceController;
@@ -31,7 +32,6 @@ import org.apache.activemq.flow.SizeLimi
 import org.apache.activemq.queue.ExclusivePersistentQueue;
 import org.apache.activemq.queue.IQueue;
 import org.apache.activemq.queue.PersistencePolicy;
-import org.apache.activemq.queue.QueueDescriptor;
 import org.apache.activemq.queue.QueueStore;
 import org.apache.activemq.queue.RestoreListener;
 import org.apache.activemq.queue.SaveableQueueElement;
@@ -314,7 +314,6 @@ public class TransactionManager {
             }
         };
         queue = new ExclusivePersistentQueue<Long, TxOp>(name, limiter);
-        queue.setDispatcher(host.getBroker().getDispatcher());
         queue.setStore(txStore);
         queue.setPersistencePolicy(DEFAULT_TX_QUEUE_PERSISTENCE_POLICY);
         queue.setExpirationMapper(EXPIRATION_MAPPER);

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java Wed Jul  7 03:24:02 2010
@@ -31,6 +31,8 @@ import org.apache.activemq.dispatch.inte
 import org.apache.activemq.queue.IQueue;
 import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.buffer.AsciiBuffer;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
 
 /**
  * @author chirino
@@ -48,6 +50,8 @@ public class VirtualHost implements Serv
     private BrokerDatabase database;
     private TransactionManager txnManager;
 
+    private DispatchQueue dispatchQueue = Dispatch.createQueue("virtual-host");
+
     public VirtualHost() {
         this.router.setVirtualHost(this);
     }
@@ -117,14 +121,14 @@ public class VirtualHost implements Serv
             database = new BrokerDatabase(store);
         }
 
-        database.setDispatcher(broker.getDispatcher());
+        database.setDispatchQueue(broker.getDispatchQueue());
         database.start();
 
         router.setDatabase(database);
 
         //Recover queues:
         queueStore.setDatabase(database);
-        queueStore.setDispatcher(broker.getDispatcher());
+        queueStore.setDispatchQueue(dispatchQueue);
         queueStore.loadQueues();
 
         // Create Queue instances
@@ -277,14 +281,14 @@ public class VirtualHost implements Serv
         /**
          * A destination has bean created
          * 
-         * @param destination
+         * @param queue
          */
         public void onCreate(Queue queue);
 
         /**
          * A destination has bean destroyed
          * 
-         * @param destination
+         * @param queue
          */
         public void onDestroy(Queue queue);
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java Wed Jul  7 03:24:02 2010
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -29,14 +30,18 @@ import org.apache.activemq.apollo.broker
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportServer;
-import org.apache.activemq.transport.pipe.Pipe;
+import org.apache.activemq.transport.pipe.PipeTransport;
 import org.apache.activemq.transport.pipe.PipeTransportFactory;
+import org.apache.activemq.transport.pipe.PipeTransportServer;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.URISupport;
 import org.apache.activemq.wireformat.WireFormat;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import static org.apache.activemq.transport.TransportFactorySupport.configure;
+import static org.apache.activemq.transport.TransportFactorySupport.verify;
+
 /**
  * Implements the vm transport which behaves like the pipe transport except that
  * it can start embedded brokers up on demand.  
@@ -46,7 +51,8 @@ import org.apache.commons.logging.LogFac
  */
 public class VMTransportFactory extends PipeTransportFactory {
 	static final private Log LOG = LogFactory.getLog(VMTransportFactory.class);
-	
+    private static final String DEFAULT_PIPE_NAME = Broker.DEFAULT_VIRTUAL_HOST_NAME.toString();
+
 	/**
 	 * This extension of the PipeTransportServer shuts down the broker
 	 * when all the connections are disconnected.
@@ -58,10 +64,9 @@ public class VMTransportFactory extends 
 		private Broker broker;
 
 		@Override
-		protected PipeTransport createClientTransport(Pipe<Object> pipe) {
+		protected PipeTransport createClientTransport() {
 			refs.incrementAndGet();
-
-			return new PipeTransport(pipe) {
+			return new PipeTransport(this) {
 				AtomicBoolean stopped = new AtomicBoolean();
 				@Override
 				public void stop() throws Exception {
@@ -90,20 +95,13 @@ public class VMTransportFactory extends 
 		}
 	}
 
-
-	private static final String DEFAULT_PIPE_NAME = Broker.DEFAULT_VIRTUAL_HOST_NAME.toString();
-
-	@Override
-	public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
-		// Wishing right now the options would have been passed to the createTransport(URI location, WireFormat wf) method so we did don't
-		// need to remove these here.
-		options.remove("create");
-		options.remove("broker");
-		return super.compositeConfigure(transport, format, options);
+    @Override
+    public TransportServer bind(URI uri) {
+		return new VmTransportServer();
 	}
-	
-	@Override
-    protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
+
+    @Override
+    public Transport connect(URI location) throws IOException {
 		try {
 			
 			String brokerURI = null;
@@ -147,7 +145,6 @@ public class VMTransportFactory extends 
 				// We want to use a vm transport server impl.
 				VmTransportServer vmTransportServer = (VmTransportServer) TransportFactory.bind(new URI("vm://" + name+"?wireFormat=null"));
 				vmTransportServer.setBroker(broker);
-				vmTransportServer.setWireFormatFactory(wf.getWireFormatFactory());
 				broker.addTransportServer(vmTransportServer);
 				broker.start();
 				
@@ -159,9 +156,8 @@ public class VMTransportFactory extends 
 			}
 			
 	        PipeTransport transport = server.connect();
-	        transport.setWireFormat(wf);
-	        return transport;
-			
+            return verify( configure(transport, options), options);
+
 		} catch (URISyntaxException e) {
 			throw IOExceptionSupport.create(e);
 		} catch (Exception e) {
@@ -169,10 +165,4 @@ public class VMTransportFactory extends 
 		}
 	}
 
-
-	@Override
-	protected PipeTransportServer createTransportServer() {
-		return new VmTransportServer();
-	}
-		
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java Wed Jul  7 03:24:02 2010
@@ -36,7 +36,7 @@ public class VMTransportTest {
 	
 	@Test()
 	public void autoCreateBroker() throws Exception {
-		Transport connect = TransportFactory.compositeConnect(new URI("vm://test1?wireFormat=mock"));
+		Transport connect = TransportFactory.connect(new URI("vm://test1?wireFormat=mock"));
 		connect.start();
 		assertNotNull(connect);
 		connect.stop();
@@ -44,12 +44,12 @@ public class VMTransportTest {
 	
 	@Test(expected=IOException.class)
 	public void noAutoCreateBroker() throws Exception {
-		TransportFactory.compositeConnect(new URI("vm://test2?create=false&wireFormat=mock"));
+		TransportFactory.connect(new URI("vm://test2?create=false&wireFormat=mock"));
 	}
 	
 	@Test(expected=IllegalArgumentException.class)
 	public void badOptions() throws Exception {
-		TransportFactory.compositeConnect(new URI("vm://test3?crazy-option=false&wireFormat=mock"));
+		TransportFactory.connect(new URI("vm://test3?crazy-option=false&wireFormat=mock"));
 	}
 	
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Wed Jul  7 03:24:02 2010
@@ -476,7 +476,6 @@ public abstract class BrokerTestBase {
         consumer.setDestination(destination);
         consumer.setName("consumer" + (i + 1));
         consumer.setTotalConsumerRate(totalConsumerRate);
-        consumer.setDispatcher(dispatcher);
         return consumer;
     }
 
@@ -498,7 +497,6 @@ public abstract class BrokerTestBase {
         producer.setDestination(destination);
         producer.setMessageIdGenerator(msgIdGenerator);
         producer.setTotalProducerRate(totalProducerRate);
-        producer.setDispatcher(dispatcher);
         return producer;
     }
 
@@ -508,7 +506,6 @@ public abstract class BrokerTestBase {
         Broker broker = new Broker();
         broker.addTransportServer(TransportFactory.bind(new URI(bindURI)));
         broker.addConnectUri(connectUri);
-        broker.setDispatcher(dispatcher);
         broker.getDefaultVirtualHost().setStore(createStore(broker));
         return broker;
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteConsumer.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteConsumer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteConsumer.java Wed Jul  7 03:24:02 2010
@@ -9,7 +9,6 @@ import org.apache.activemq.apollo.broker
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.metric.MetricAggregator;
 import org.apache.activemq.metric.MetricCounter;
-import org.apache.activemq.transport.DispatchableTransport;
 import org.apache.activemq.transport.TransportFactory;
 
 abstract public class RemoteConsumer extends Connection {
@@ -29,11 +28,8 @@ abstract public class RemoteConsumer ext
         consumerRate.name("Consumer " + name + " Rate");
         totalConsumerRate.add(consumerRate);
 
-        transport = TransportFactory.compositeConnect(uri);
-        if(transport instanceof DispatchableTransport)
-        {
-            schedualWait = true;
-        }
+        transport = TransportFactory.connect(uri);
+        schedualWait = true;
         initialize();
         super.start();
         setupSubscription();
@@ -46,12 +42,12 @@ abstract public class RemoteConsumer ext
     protected void messageReceived(final ISourceController<MessageDelivery> controller, final MessageDelivery elem) {
         if( schedualWait ) {
             if (thinkTime > 0) {
-                getDispatcher().getGlobalQueue().dispatchAfter(new Runnable(){
+                dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, new Runnable(){
                     public void run() {
                         consumerRate.increment();
                         controller.elementDispatched(elem);
                     }
-                }, thinkTime, TimeUnit.MILLISECONDS);
+                });
                 
             }
             else

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java Wed Jul  7 03:24:02 2010
@@ -31,7 +31,6 @@ abstract public class RemoteProducer ext
     protected String property;
     protected MetricAggregator totalProducerRate;
     protected MessageDelivery next;
-    protected DispatchQueue dispatchQueue;
     protected Runnable dispatchTask;
     protected String filler;
     protected int payloadSize = 20;
@@ -55,13 +54,12 @@ abstract public class RemoteProducer ext
         totalProducerRate.add(rate);
 
 
-        transport = TransportFactory.compositeConnect(uri);
+        transport = TransportFactory.connect(uri);
         initialize();
         super.start();
         
         setupProducer();
         
-        dispatchQueue = getDispatcher().createSerialQueue(name + "-client", STICK_TO_CALLER_THREAD);
         dispatchTask = new Runnable(){
             public void run() {
                 dispatch();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java Wed Jul  7 03:24:02 2010
@@ -26,9 +26,9 @@ import org.apache.activemq.apollo.broker
 import org.apache.activemq.apollo.broker.MessageDelivery;
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.broker.store.StoreFactory;
-import org.apache.activemq.dispatch.Dispatcher;
-import org.apache.activemq.dispatch.DispatcherConfig;
 import org.apache.activemq.queue.IQueue;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
 
 /**
  * @author cmacnaug
@@ -37,7 +37,7 @@ import org.apache.activemq.queue.IQueue;
 public class SharedQueueTest extends TestCase {
 
 
-    Dispatcher dispatcher;
+    DispatchQueue dispatchQueue;
     BrokerDatabase database;
     BrokerQueueStore queueStore;
     private static final boolean USE_KAHA_DB = true;
@@ -46,8 +46,8 @@ public class SharedQueueTest extends Tes
 
     protected ArrayList<IQueue<Long, MessageDelivery>> queues = new ArrayList<IQueue<Long, MessageDelivery>>();
 
-    protected Dispatcher createDispatcher() {
-        return DispatcherConfig.create("test", Runtime.getRuntime().availableProcessors());
+    protected DispatchQueue createDispatcher() {
+        return Dispatch.createQueue();
     }
 
     protected int consumerStartDelay = 0;
@@ -63,20 +63,19 @@ public class SharedQueueTest extends Tes
     }
     
     protected void startServices() throws Exception {
-        dispatcher = createDispatcher();
-        dispatcher.resume();
+        dispatchQueue = createDispatcher();
+        dispatchQueue.resume();
         database = new BrokerDatabase(createStore());
-        database.setDispatcher(dispatcher);
+        database.setDispatchQueue(dispatchQueue);
         database.start();
         queueStore = new BrokerQueueStore();
         queueStore.setDatabase(database);
-        queueStore.setDispatcher(dispatcher);
         queueStore.loadQueues();
     }
 
     protected void stopServices() throws Exception {
         database.stop();
-        dispatcher.release();
+        dispatchQueue.release();
         queues.clear();
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java Wed Jul  7 03:24:02 2010
@@ -89,6 +89,7 @@ import org.apache.activemq.management.St
 import org.apache.activemq.management.StatsImpl;
 import org.apache.activemq.state.CommandVisitorAdapter;
 import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.transport.ResponseCorrelator;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.util.IdGenerator;
@@ -148,7 +149,7 @@ public class ActiveMQConnection implemen
     private int sendTimeout =0;
     private boolean sendAcksAsync=true;
 
-    private final Transport transport;
+    private final ResponseCorrelator transport;
     private final IdGenerator clientIdGenerator;
     private final JMSStatsImpl factoryStats;
     private final JMSConnectionStatsImpl stats;
@@ -196,7 +197,7 @@ public class ActiveMQConnection implemen
      */
     protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception {
 
-        this.transport = transport;
+        this.transport = new ResponseCorrelator(transport);
         this.clientIdGenerator = clientIdGenerator;
         this.factoryStats = factoryStats;
 
@@ -1221,11 +1222,11 @@ public class ActiveMQConnection implemen
     }
 
 	private void doAsyncSendPacket(Command command) throws JMSException {
-		try {
+//		try {
 		    this.transport.oneway(command);
-		} catch (IOException e) {
-		    throw JMSExceptionSupport.create(e);
-		}
+//		} catch (IOException e) {
+//		    throw JMSExceptionSupport.create(e);
+//		}
 	}
 
     /**
@@ -1820,21 +1821,21 @@ public class ActiveMQConnection implemen
 		}
 	}
 
-    public void transportInterupted() {
+    public void onDisconnected() {
         for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
             ActiveMQSession s = i.next();
             s.clearMessagesInProgress();
         }
         for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
             TransportListener listener = iter.next();
-            listener.transportInterupted();
+            listener.onDisconnected();
         }
     }
 
-    public void transportResumed() {
+    public void onConnected() {
         for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
             TransportListener listener = iter.next();
-            listener.transportResumed();
+            listener.onConnected();
         }
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java Wed Jul  7 03:24:02 2010
@@ -238,7 +238,7 @@ public class ActiveMQConnectionFactory e
      */
     protected Transport createTransport() throws JMSException {
         try {
-            return TransportFactory.connect(brokerURL, DEFAULT_CONNECTION_EXECUTOR);
+            return TransportFactory.connect(brokerURL);
         } catch (Exception e) {
             throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
         }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java Wed Jul  7 03:24:02 2010
@@ -43,15 +43,10 @@ public class BrokerXml {
     private List<String> transportServers = new ArrayList<String>();
     @XmlElement(name="connect-uri")
     private List<String> connectUris = new ArrayList<String>();
-    @XmlElement(required = false)
-    private DispatcherXml dispatcher;
 	
 	
 	public Broker createMessageBroker() throws Exception {
 		Broker rc = new Broker();
-		if( dispatcher!=null ) {
-			rc.setDispatcher(dispatcher.createDispatcher(this));
-		}
 		for (VirtualHostXml element : virtualHosts) {
 			rc.addVirtualHost(element.createVirtualHost(this));
 		}
@@ -98,13 +93,4 @@ public class BrokerXml {
 		this.connectUris = connectUris;
 	}
 
-
-	public DispatcherXml getDispatcher() {
-		return dispatcher;
-	}
-	public void setDispatcher(DispatcherXml dispatcher) {
-		this.dispatcher = dispatcher;
-	}
-
-
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java Wed Jul  7 03:24:02 2010
@@ -44,7 +44,6 @@ public class JAXBConfigTest extends Test
 		LOG.info("Loading broker configuration from the classpath with URI: " + uri);
 		Broker broker = BrokerFactory.createBroker(uri);
 		
-		Dispatcher p = (Dispatcher)broker.getDispatcher();
 //		assertEquals(4, p.getSize());
 //		assertEquals("test dispatcher", p.getName());
 		

Modified: activemq/sandbox/activemq-apollo-actor/activemq-kaha/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-kaha/pom.xml?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-kaha/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-kaha/pom.xml Wed Jul  7 03:24:02 2010
@@ -25,11 +25,11 @@
   </parent>
 
   <groupId>org.apache.activemq</groupId>
-  <artifactId>activemq-kaha</artifactId>
+  <artifactId>activemq-hawtdb</artifactId>
   <packaging>jar</packaging>
   <version>6.0-SNAPSHOT</version>
 
-  <name>ActiveMQ :: ${artifactId} :: ${version}</name>
+  <name>ActiveMQ :: HawtDB</name>
 
   <dependencies>
 
@@ -40,8 +40,9 @@
     </dependency>
     
     <dependency>
-      <groupId>org.apache.activemq</groupId>
-      <artifactId>kahadb</artifactId>
+      <groupId>org.fusesource.hawtdb</groupId>
+      <artifactId>hawtdb</artifactId>
+      <version>${hawtdb-version}</version>
       <optional>false</optional>
     </dependency>