You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:24:44 UTC
svn commit: r961062 [3/14] - in /activemq/sandbox/activemq-apollo-actor: ./
activemq-all/ activemq-all/src/test/ide-resources/
activemq-all/src/test/java/org/apache/activemq/jaxb/
activemq-all/src/test/java/org/apache/activemq/legacy/ activemq-all/src/...
Modified: activemq/sandbox/activemq-apollo-actor/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Wed Jul 7 03:24:02 2010
@@ -16,309 +16,135 @@
*/
package org.apache.activemq.transport.tcp;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import javax.net.ServerSocketFactory;
-
-import org.apache.activemq.Service;
-//import org.apache.activemq.ThreadPriorities;
import org.apache.activemq.transport.Transport;
-//import org.apache.activemq.transport.TransportLoggerFactory;
+import org.apache.activemq.transport.TransportAcceptListener;
import org.apache.activemq.transport.TransportServer;
-import org.apache.activemq.transport.TransportServerThreadSupport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
-import org.apache.activemq.util.ServiceListener;
-import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.DispatchSource;
+
+import java.io.IOException;
+import java.net.*;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.HashMap;
+import java.util.Map;
/**
* A TCP based implementation of {@link TransportServer}
- *
- * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
- * @version $Revision: 1.1 $
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener{
+public class TcpTransportServer implements TransportServer {
- private static final Log LOG = LogFactory.getLog(TcpTransportServer.class);
- protected ServerSocket serverSocket;
- protected int backlog = 5000;
protected WireFormatFactory wireFormatFactory;
- protected final TcpTransportFactory transportFactory;
- protected long maxInactivityDuration = 30000;
- protected long maxInactivityDurationInitalDelay = 10000;
- protected int minmumWireFormatVersion;
- protected boolean useQueueForAccept=true;
-
- /**
- * trace=true -> the Transport stack where this TcpTransport
- * object will be, will have a TransportLogger layer
- * trace=false -> the Transport stack where this TcpTransport
- * object will be, will NOT have a TransportLogger layer, and therefore
- * will never be able to print logging messages.
- * This parameter is most probably set in Connection or TransportConnector URIs.
- */
- protected boolean trace = false;
-
- protected int soTimeout = 0;
- protected int socketBufferSize = 64 * 1024;
- protected int connectionTimeout = 30000;
+ private ServerSocketChannel channel;
+ private TransportAcceptListener listener;
+ private URI bindURI;
+ private URI connectURI;
+ private DispatchQueue dispatchQueue;
+ private DispatchSource acceptSource;
+ private int backlog = 500;
+ private Map<String, Object> transportOptions;
- /**
- * Name of the LogWriter implementation to use.
- * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
- * This parameter is most probably set in Connection or TransportConnector URIs.
-
- protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
- */
-
- /**
- * Specifies if the TransportLogger will be manageable by JMX or not.
- * Also, as long as there is at least 1 TransportLogger which is manageable,
- * a TransportLoggerControl MBean will me created.
- */
- protected boolean dynamicManagement = false;
- /**
- * startLogging=true -> the TransportLogger object of the Transport stack
- * will initially write messages to the log.
- * startLogging=false -> the TransportLogger object of the Transport stack
- * will initially NOT write messages to the log.
- * This parameter only has an effect if trace == true.
- * This parameter is most probably set in Connection or TransportConnector URIs.
- */
- protected boolean startLogging = true;
- protected Map<String, Object> transportOptions;
- protected final ServerSocketFactory serverSocketFactory;
- protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
- protected Thread socketHandlerThread;
- /**
- * The maximum number of sockets allowed for this server
- */
- protected int maximumConnections = Integer.MAX_VALUE;
- protected int currentTransportCount=0;
-
- public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
- super(location);
- this.transportFactory = transportFactory;
- this.serverSocketFactory = serverSocketFactory;
-
+ public TcpTransportServer(URI location) {
+ this.bindURI = location;
}
- public void bind() throws IOException {
- URI bind = getBindLocation();
-
- String host = bind.getHost();
- host = (host == null || host.length() == 0) ? "localhost" : host;
- InetAddress addr = InetAddress.getByName(host);
-
- try {
-
- this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
- configureServerSocket(this.serverSocket);
-
- } catch (IOException e) {
- throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
- }
- try {
- setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind
- .getFragment()));
- } catch (URISyntaxException e) {
-
- // it could be that the host name contains invalid characters such
- // as _ on unix platforms
- // so lets try use the IP address instead
- try {
- setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind.getFragment()));
- } catch (URISyntaxException e2) {
- throw IOExceptionSupport.create(e2);
- }
- }
+ public void setAcceptListener(TransportAcceptListener listener) {
+ this.listener = listener;
}
- private void configureServerSocket(ServerSocket socket) throws SocketException {
- socket.setSoTimeout(2000);
- if (transportOptions != null) {
- IntrospectionSupport.setProperties(socket, transportOptions);
- }
+ public URI getConnectURI() {
+ return connectURI;
}
- /**
- * @return Returns the wireFormatFactory.
- */
- public WireFormatFactory getWireFormatFactory() {
- return wireFormatFactory;
- }
-
- /**
- * @param wireFormatFactory The wireFormatFactory to set.
- */
- public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
- this.wireFormatFactory = wireFormatFactory;
- }
-
- public long getMaxInactivityDuration() {
- return maxInactivityDuration;
- }
-
- public void setMaxInactivityDuration(long maxInactivityDuration) {
- this.maxInactivityDuration = maxInactivityDuration;
- }
-
- public long getMaxInactivityDurationInitalDelay() {
- return this.maxInactivityDurationInitalDelay;
- }
-
- public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) {
- this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
- }
-
- public int getMinmumWireFormatVersion() {
- return minmumWireFormatVersion;
- }
-
- public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
- this.minmumWireFormatVersion = minmumWireFormatVersion;
- }
-
- public boolean isTrace() {
- return trace;
+ public InetSocketAddress getSocketAddress() {
+ return (InetSocketAddress) channel.socket().getLocalSocketAddress();
}
- public void setTrace(boolean trace) {
- this.trace = trace;
+ public DispatchQueue getDispatchQueue() {
+ return dispatchQueue;
}
-//
-// public String getLogWriterName() {
-// return logWriterName;
-// }
-//
-// public void setLogWriterName(String logFormat) {
-// this.logWriterName = logFormat;
-// }
- public boolean isDynamicManagement() {
- return dynamicManagement;
+ public void setDispatchQueue(DispatchQueue dispatchQueue) {
+ this.dispatchQueue = dispatchQueue;
}
- public void setDynamicManagement(boolean useJmx) {
- this.dynamicManagement = useJmx;
+ public void suspend() {
+ acceptSource.resume();
}
- public boolean isStartLogging() {
- return startLogging;
+ public void resume() {
+ acceptSource.resume();
}
-
- public void setStartLogging(boolean startLogging) {
- this.startLogging = startLogging;
- }
-
- /**
- * @return the backlog
- */
- public int getBacklog() {
- return backlog;
+ public void start() throws IOException {
+ bind();
+ acceptSource = Dispatch.createSource(channel, SelectionKey.OP_ACCEPT, dispatchQueue);
+ acceptSource.setEventHandler(new Runnable() {
+ public void run() {
+ try {
+ SocketChannel client = channel.accept();
+ handleSocket(client);
+ } catch (IOException e) {
+ listener.onAcceptError(e);
+ }
+ }
+ });
+ acceptSource.setCancelHandler(new Runnable() {
+ public void run() {
+ try {
+ channel.close();
+ } catch (IOException e) {
+ }
+ }
+ });
+ acceptSource.resume();
}
- /**
- * @param backlog the backlog to set
- */
- public void setBacklog(int backlog) {
- this.backlog = backlog;
- }
+ public void bind() throws IOException {
+ URI bind = bindURI;
- /**
- * @return the useQueueForAccept
- */
- public boolean isUseQueueForAccept() {
- return useQueueForAccept;
- }
+ String host = bind.getHost();
+ host = (host == null || host.length() == 0) ? "localhost" : host;
+ if (host.equals("localhost")) {
+ host = "0.0.0.0";
+ }
- /**
- * @param useQueueForAccept the useQueueForAccept to set
- */
- public void setUseQueueForAccept(boolean useQueueForAccept) {
- this.useQueueForAccept = useQueueForAccept;
- }
-
+ InetAddress addr = InetAddress.getByName(host);
+ try {
+ channel = ServerSocketChannel.open();
+ channel.socket().bind(new InetSocketAddress(addr, bind.getPort()), backlog);
+ } catch (IOException e) {
+ throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
+ }
- /**
- * pull Sockets from the ServerSocket
- */
- public void run() {
- while (!isStopped()) {
- Socket socket = null;
+ try {
+ connectURI = connectURI(resolveHostName(channel.socket(), addr));
+ } catch (URISyntaxException e) {
+ // it could be that the host name contains invalid characters such
+ // as _ on unix platforms
+ // so lets try use the IP address instead
try {
- socket = serverSocket.accept();
- if (socket != null) {
- if (isStopped() || getAcceptListener() == null) {
- socket.close();
- } else {
- if (useQueueForAccept) {
- socketQueue.put(socket);
- }else {
- handleSocket(socket);
- }
- }
- }
- } catch (SocketTimeoutException ste) {
- // expect this to happen
- } catch (Exception e) {
- if (!isStopping()) {
- onAcceptError(e);
- } else if (!isStopped()) {
- LOG.warn("run()", e);
- onAcceptError(e);
- }
+ connectURI = connectURI(addr.getHostAddress());
+ } catch (URISyntaxException e2) {
+ throw IOExceptionSupport.create(e2);
}
}
}
- /**
- * Allow derived classes to override the Transport implementation that this
- * transport server creates.
- *
- * @param socket
- * @param format
- * @return
- * @throws IOException
- */
- protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
- return new TcpTransport(format, socket);
- }
-
- /**
- * @return pretty print of this
- */
- public String toString() {
- return "" + getBindLocation();
+ private URI connectURI(String hostname) throws URISyntaxException {
+ return new URI(bindURI.getScheme(), bindURI.getUserInfo(), hostname, channel.socket().getLocalPort(), bindURI.getPath(), bindURI.getQuery(), bindURI.getFragment());
}
- /**
- * @param socket
- * @param inetAddress
- * @return real hostName
- * @throws UnknownHostException
- */
protected String resolveHostName(ServerSocket socket, InetAddress bindAddress) throws UnknownHostException {
String result = null;
if (socket.isBound()) {
@@ -333,144 +159,426 @@ public class TcpTransportServer extends
}
return result;
}
-
- protected void doStart() throws Exception {
- if(useQueueForAccept) {
- Runnable run = new Runnable() {
- public void run() {
- try {
- while (!isStopped() && !isStopping()) {
- Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
- if (sock != null) {
- handleSocket(sock);
- }
- }
-
- } catch (InterruptedException e) {
- LOG.info("socketQueue interuppted - stopping");
- if (!isStopping()) {
- onAcceptError(e);
- }
- }
-
- }
-
- };
- socketHandlerThread = new Thread(null, run,
- "ActiveMQ Transport Server Thread Handler: " + toString(),
- getStackSize());
- socketHandlerThread.setDaemon(true);
- //socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
- socketHandlerThread.start();
- }
- super.doStart();
-
- }
- protected void doStop(ServiceStopper stopper) throws Exception {
- super.doStop(stopper);
- if (serverSocket != null) {
- serverSocket.close();
- }
+ public void stop() throws Exception {
+ acceptSource.release();
}
- public InetSocketAddress getSocketAddress() {
- return (InetSocketAddress)serverSocket.getLocalSocketAddress();
+ public WireFormatFactory getWireFormatFactory() {
+ return wireFormatFactory;
}
- public void setTransportOption(Map<String, Object> transportOptions) {
- this.transportOptions = transportOptions;
+ public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
+ this.wireFormatFactory = wireFormatFactory;
}
-
- protected final void handleSocket(Socket socket) {
- try {
- if (this.currentTransportCount >= this.maximumConnections) {
-
- }else {
- HashMap<String, Object> options = new HashMap<String, Object>();
- options.put("maxInactivityDuration", Long
- .valueOf(maxInactivityDuration));
- options.put("maxInactivityDurationInitalDelay", Long
- .valueOf(maxInactivityDurationInitalDelay));
- options.put("minmumWireFormatVersion", Integer
- .valueOf(minmumWireFormatVersion));
- options.put("trace", Boolean.valueOf(trace));
- options.put("soTimeout", Integer.valueOf(soTimeout));
- options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
- options.put("connectionTimeout", Integer.valueOf(connectionTimeout));
-// options.put("logWriterName", logWriterName);
- options
- .put("dynamicManagement", Boolean
- .valueOf(dynamicManagement));
- options.put("startLogging", Boolean.valueOf(startLogging));
-
- options.putAll(transportOptions);
- WireFormat format = wireFormatFactory.createWireFormat();
- Transport transport = createTransport(socket, format);
- if (transport instanceof ServiceSupport) {
- ((ServiceSupport) transport).addServiceListener(this);
- }
- Transport configuredTransport = transportFactory.serverConfigure(
- transport, format, options);
- getAcceptListener().onAccept(configuredTransport);
- }
- } catch (SocketTimeoutException ste) {
- // expect this to happen
- } catch (Exception e) {
- if (!isStopping()) {
- onAcceptError(e);
- } else if (!isStopped()) {
- LOG.warn("run()", e);
- onAcceptError(e);
- }
- }
-
- }
- public int getSoTimeout() {
- return soTimeout;
+ public URI getBindURI() {
+ return bindURI;
}
- public void setSoTimeout(int soTimeout) {
- this.soTimeout = soTimeout;
+ public void setBindURI(URI bindURI) {
+ this.bindURI = bindURI;
}
- public int getSocketBufferSize() {
- return socketBufferSize;
+ public int getBacklog() {
+ return backlog;
}
- public void setSocketBufferSize(int socketBufferSize) {
- this.socketBufferSize = socketBufferSize;
+ public void setBacklog(int backlog) {
+ this.backlog = backlog;
}
- public int getConnectionTimeout() {
- return connectionTimeout;
+ protected final void handleSocket(SocketChannel socket) throws IOException {
+ HashMap<String, Object> options = new HashMap<String, Object>();
+// options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
+// options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay));
+// options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion));
+// options.put("trace", Boolean.valueOf(trace));
+// options.put("soTimeout", Integer.valueOf(soTimeout));
+// options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
+// options.put("connectionTimeout", Integer.valueOf(connectionTimeout));
+// options.put("dynamicManagement", Boolean.valueOf(dynamicManagement));
+// options.put("startLogging", Boolean.valueOf(startLogging));
+
+ Transport transport = createTransport(socket, options);
+ transport.setWireformat(wireFormatFactory.createWireFormat());
+ listener.onAccept(transport);
+ }
+
+ private Transport createTransport(SocketChannel socketChannel, HashMap<String, Object> options) throws IOException {
+ TcpTransport transport = new TcpTransport();
+ transport.connected(socketChannel);
+ if( options!=null ) {
+ IntrospectionSupport.setProperties(transport, options);
+ }
+ if (transportOptions != null) {
+ IntrospectionSupport.setProperties(transport, transportOptions);
+ }
+ return transport;
}
- public void setConnectionTimeout(int connectionTimeout) {
- this.connectionTimeout = connectionTimeout;
+ public void setTransportOption(Map<String, Object> transportOptions) {
+ this.transportOptions = transportOptions;
}
- /**
- * @return the maximumConnections
- */
- public int getMaximumConnections() {
- return maximumConnections;
- }
+
+
+// private static final Log LOG = LogFactory.getLog(TcpTransportServer.class);
+// protected ServerSocket serverSocket;
+// protected int backlog = 5000;
+// protected WireFormatFactory wireFormatFactory;
+// protected final TcpTransportFactory transportFactory;
+// protected long maxInactivityDuration = 30000;
+// protected long maxInactivityDurationInitalDelay = 10000;
+// protected int minmumWireFormatVersion;
+// protected boolean useQueueForAccept=true;
+//
+// /**
+// * trace=true -> the Transport stack where this TcpTransport
+// * object will be, will have a TransportLogger layer
+// * trace=false -> the Transport stack where this TcpTransport
+// * object will be, will NOT have a TransportLogger layer, and therefore
+// * will never be able to print logging messages.
+// * This parameter is most probably set in Connection or TransportConnector URIs.
+// */
+// protected boolean trace = false;
+//
+// protected int soTimeout = 0;
+// protected int socketBufferSize = 64 * 1024;
+// protected int connectionTimeout = 30000;
+//
+// /**
+// * Name of the LogWriter implementation to use.
+// * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
+// * This parameter is most probably set in Connection or TransportConnector URIs.
+//
+// protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
+// */
+//
+// /**
+// * Specifies if the TransportLogger will be manageable by JMX or not.
+// * Also, as long as there is at least 1 TransportLogger which is manageable,
+// * a TransportLoggerControl MBean will me created.
+// */
+// protected boolean dynamicManagement = false;
+// /**
+// * startLogging=true -> the TransportLogger object of the Transport stack
+// * will initially write messages to the log.
+// * startLogging=false -> the TransportLogger object of the Transport stack
+// * will initially NOT write messages to the log.
+// * This parameter only has an effect if trace == true.
+// * This parameter is most probably set in Connection or TransportConnector URIs.
+// */
+// protected boolean startLogging = true;
+// protected Map<String, Object> transportOptions;
+// protected final ServerSocketFactory serverSocketFactory;
+// protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
+// protected Thread socketHandlerThread;
+// /**
+// * The maximum number of sockets allowed for this server
+// */
+// protected int maximumConnections = Integer.MAX_VALUE;
+// protected int currentTransportCount=0;
+//
+// public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+// super(location);
+// this.transportFactory = transportFactory;
+// this.serverSocketFactory = serverSocketFactory;
+//
+// }
+//
+// public void bind() throws IOException {
+// URI bind = getBindLocation();
+//
+// String host = bind.getHost();
+// host = (host == null || host.length() == 0) ? "localhost" : host;
+// InetAddress addr = InetAddress.getByName(host);
+//
+// try {
+//
+// this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
+// configureServerSocket(this.serverSocket);
+//
+// } catch (IOException e) {
+// throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
+// }
+// try {
+// setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind
+// .getFragment()));
+// } catch (URISyntaxException e) {
+//
+// // it could be that the host name contains invalid characters such
+// // as _ on unix platforms
+// // so lets try use the IP address instead
+// try {
+// setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind.getFragment()));
+// } catch (URISyntaxException e2) {
+// throw IOExceptionSupport.create(e2);
+// }
+// }
+// }
+//
+// private void configureServerSocket(ServerSocket socket) throws SocketException {
+// socket.setSoTimeout(2000);
+// if (transportOptions != null) {
+// IntrospectionSupport.setProperties(socket, transportOptions);
+// }
+// }
+//
+// /**
+// * @return Returns the wireFormatFactory.
+// */
+// public WireFormatFactory getWireFormatFactory() {
+// return wireFormatFactory;
+// }
+//
+// /**
+// * @param wireFormatFactory The wireFormatFactory to set.
+// */
+// public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
+// this.wireFormatFactory = wireFormatFactory;
+// }
+//
+// public long getMaxInactivityDuration() {
+// return maxInactivityDuration;
+// }
+//
+// public void setMaxInactivityDuration(long maxInactivityDuration) {
+// this.maxInactivityDuration = maxInactivityDuration;
+// }
+//
+// public long getMaxInactivityDurationInitalDelay() {
+// return this.maxInactivityDurationInitalDelay;
+// }
+//
+// public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) {
+// this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
+// }
+//
+// public int getMinmumWireFormatVersion() {
+// return minmumWireFormatVersion;
+// }
+//
+// public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
+// this.minmumWireFormatVersion = minmumWireFormatVersion;
+// }
+//
+// public boolean isTrace() {
+// return trace;
+// }
+//
+// public void setTrace(boolean trace) {
+// this.trace = trace;
+// }
+////
+//// public String getLogWriterName() {
+//// return logWriterName;
+//// }
+////
+//// public void setLogWriterName(String logFormat) {
+//// this.logWriterName = logFormat;
+//// }
+//
+// public boolean isDynamicManagement() {
+// return dynamicManagement;
+// }
+//
+// public void setDynamicManagement(boolean useJmx) {
+// this.dynamicManagement = useJmx;
+// }
+//
+// public boolean isStartLogging() {
+// return startLogging;
+// }
+//
+//
+// public void setStartLogging(boolean startLogging) {
+// this.startLogging = startLogging;
+// }
+//
+// /**
+// * @return the backlog
+// */
+// public int getBacklog() {
+// return backlog;
+// }
+//
+// /**
+// * @param backlog the backlog to set
+// */
+// public void setBacklog(int backlog) {
+// this.backlog = backlog;
+// }
+//
+// /**
+// * @return the useQueueForAccept
+// */
+// public boolean isUseQueueForAccept() {
+// return useQueueForAccept;
+// }
+//
+// /**
+// * @param useQueueForAccept the useQueueForAccept to set
+// */
+// public void setUseQueueForAccept(boolean useQueueForAccept) {
+// this.useQueueForAccept = useQueueForAccept;
+// }
+//
+//
+// /**
+// * pull Sockets from the ServerSocket
+// */
+// public void run() {
+// while (!isStopped()) {
+// Socket socket = null;
+// try {
+// socket = serverSocket.accept();
+// if (socket != null) {
+// if (isStopped() || getAcceptListener() == null) {
+// socket.close();
+// } else {
+// if (useQueueForAccept) {
+// socketQueue.put(socket);
+// }else {
+// handleSocket(socket);
+// }
+// }
+// }
+// } catch (SocketTimeoutException ste) {
+// // expect this to happen
+// } catch (Exception e) {
+// if (!isStopping()) {
+// onAcceptError(e);
+// } else if (!isStopped()) {
+// LOG.warn("run()", e);
+// onAcceptError(e);
+// }
+// }
+// }
+// }
+//
+// /**
+// * Allow derived classes to override the Transport implementation that this
+// * transport server creates.
+// *
+// * @param socket
+// * @param format
+// * @return
+// * @throws IOException
+// */
+// protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
+// return new TcpTransport(format, socket);
+// }
+//
/**
- * @param maximumConnections the maximumConnections to set
+ * @return pretty print of this
*/
- public void setMaximumConnections(int maximumConnections) {
- this.maximumConnections = maximumConnections;
- }
-
-
- public void started(Service service) {
- this.currentTransportCount++;
+ public String toString() {
+ return "" + bindURI;
}
+//
+// /**
+// * @param socket
+// * @param inetAddress
+// * @return real hostName
+// * @throws UnknownHostException
+// */
+//
+// protected void doStart() throws Exception {
+// if(useQueueForAccept) {
+// Runnable run = new Runnable() {
+// public void run() {
+// try {
+// while (!isStopped() && !isStopping()) {
+// Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
+// if (sock != null) {
+// handleSocket(sock);
+// }
+// }
+//
+// } catch (InterruptedException e) {
+// LOG.info("socketQueue interuppted - stopping");
+// if (!isStopping()) {
+// onAcceptError(e);
+// }
+// }
+//
+// }
+//
+// };
+// socketHandlerThread = new Thread(null, run,
+// "ActiveMQ Transport Server Thread Handler: " + toString(),
+// getStackSize());
+// socketHandlerThread.setDaemon(true);
+// //socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
+// socketHandlerThread.start();
+// }
+// super.doStart();
+//
+// }
+//
+// protected void doStop(ServiceStopper stopper) throws Exception {
+// super.doStop(stopper);
+// if (serverSocket != null) {
+// serverSocket.close();
+// }
+// }
+//
+// public InetSocketAddress getSocketAddress() {
+// return (InetSocketAddress)serverSocket.getLocalSocketAddress();
+// }
+//
+// public void setTransportOption(Map<String, Object> transportOptions) {
+// this.transportOptions = transportOptions;
+// }
+//
+//
+// public int getSoTimeout() {
+// return soTimeout;
+// }
+//
+// public void setSoTimeout(int soTimeout) {
+// this.soTimeout = soTimeout;
+// }
+//
+// public int getSocketBufferSize() {
+// return socketBufferSize;
+// }
+//
+// public void setSocketBufferSize(int socketBufferSize) {
+// this.socketBufferSize = socketBufferSize;
+// }
+//
+// public int getConnectionTimeout() {
+// return connectionTimeout;
+// }
+//
+// public void setConnectionTimeout(int connectionTimeout) {
+// this.connectionTimeout = connectionTimeout;
+// }
+//
+// /**
+// * @return the maximumConnections
+// */
+// public int getMaximumConnections() {
+// return maximumConnections;
+// }
+//
+// /**
+// * @param maximumConnections the maximumConnections to set
+// */
+// public void setMaximumConnections(int maximumConnections) {
+// this.maximumConnections = maximumConnections;
+// }
+//
+//
+// public void started(Service service) {
+// this.currentTransportCount++;
+// }
+//
+// public void stopped(Service service) {
+// this.currentTransportCount--;
+// }
- public void stopped(Service service) {
- this.currentTransportCount--;
- }
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml Wed Jul 7 03:24:02 2010
@@ -34,6 +34,12 @@
<dependencies>
<dependency>
+ <groupId>org.fusesource.hawtdispatch</groupId>
+ <artifactId>hawtdispatch</artifactId>
+ <version>${hawtdispatch-version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-transport</artifactId>
</dependency>
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java Wed Jul 7 03:24:02 2010
@@ -19,33 +19,25 @@ package org.apache.activemq.apollo;
import java.beans.ExceptionListener;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.Service;
import org.apache.activemq.dispatch.Dispatcher;
-import org.apache.activemq.transport.DispatchableTransport;
+import org.apache.activemq.transport.CompletionCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
abstract public class Connection implements TransportListener, Service {
protected Transport transport;
protected String name;
- private int priorityLevels;
- protected int outputWindowSize = 1024 * 1024;
- protected int outputResumeThreshold = 900 * 1024;
- protected int inputWindowSize = 1024 * 1024;
- protected int inputResumeThreshold = 512 * 1024;
- protected boolean useAsyncWriteThread = true;
-
- private Dispatcher dispatcher;
- private final AtomicBoolean stopping = new AtomicBoolean();
- private ExecutorService blockingWriter;
- private ExceptionListener exceptionListener;
+ protected DispatchQueue dispatchQueue = Dispatch.createQueue();
+ protected boolean stopping;
+ protected ExceptionListener exceptionListener;
public void setTransport(Transport transport) {
this.transport = transport;
@@ -53,44 +45,20 @@ abstract public class Connection impleme
public void start() throws Exception {
transport.setTransportListener(this);
-
- if (transport instanceof DispatchableTransport) {
- DispatchableTransport dt = ((DispatchableTransport) transport);
- if (name != null) {
- dt.setName(name);
- }
- dt.setDispatcher(getDispatcher());
- } else {
- if (useAsyncWriteThread) {
- blockingWriter = Executors.newSingleThreadExecutor(new ThreadFactory() {
- public Thread newThread(Runnable r) {
- return new Thread(r, "Writer-" + name);
- }
- });
- }
- }
+ transport.setDispatchQueue(dispatchQueue);
transport.start();
}
public void stop() throws Exception {
- stopping.set(true);
+ stopping = true;
if (transport != null) {
transport.stop();
}
- if (blockingWriter != null) {
- blockingWriter.shutdown();
- }
+ dispatchQueue.release();
}
public void setName(String name) {
this.name = name;
- if (blockingWriter != null) {
- blockingWriter.execute(new Runnable() {
- public void run() {
- Thread.currentThread().setName("Writer-" + Connection.this.name);
- }
- });
- }
}
public String getName() {
@@ -104,36 +72,8 @@ abstract public class Connection impleme
write(o, null);
}
- public final void write(final Object o, final Runnable onCompleted) {
- if (blockingWriter == null) {
- try {
- transport.oneway(o);
- if (onCompleted != null) {
- onCompleted.run();
- }
- } catch (IOException e) {
- onException(e);
- }
- } else {
- try {
- blockingWriter.execute(new Runnable() {
- public void run() {
- if (!stopping.get()) {
- try {
- transport.oneway(o);
- if (onCompleted != null) {
- onCompleted.run();
- }
- } catch (IOException e) {
- onException(e);
- }
- }
- }
- });
- } catch (RejectedExecutionException re) {
- // Must be shutting down.
- }
- }
+ public final void write(final Object o, final CompletionCallback callback) {
+ transport.oneway(o, callback);
}
final public void onException(IOException error) {
@@ -149,49 +89,21 @@ abstract public class Connection impleme
}
public void setStopping() {
- stopping.set(true);
+ stopping = true;
}
public boolean isStopping() {
- return stopping.get();
- }
-
- public void transportInterupted() {
- }
-
- public void transportResumed() {
- }
-
- public int getPriorityLevels() {
- return priorityLevels;
- }
-
- public void setPriorityLevels(int priorityLevels) {
- this.priorityLevels = priorityLevels;
- }
-
- public Dispatcher getDispatcher() {
- return dispatcher;
+ return stopping;
}
- public void setDispatcher(Dispatcher dispatcher) {
- this.dispatcher = dispatcher;
+ public void onDisconnected() {
}
- public int getOutputWindowSize() {
- return outputWindowSize;
+ public void onConnected() {
}
- public int getOutputResumeThreshold() {
- return outputResumeThreshold;
- }
-
- public int getInputWindowSize() {
- return inputWindowSize;
- }
-
- public int getInputResumeThreshold() {
- return inputResumeThreshold;
+ public DispatchQueue getDispatchQueue() {
+ return dispatchQueue;
}
public Transport getTransport() {
@@ -206,12 +118,4 @@ abstract public class Connection impleme
this.exceptionListener = exceptionListener;
}
- public boolean isUseAsyncWriteThread() {
- return useAsyncWriteThread;
- }
-
- public void setUseAsyncWriteThread(boolean useAsyncWriteThread) {
- this.useAsyncWriteThread = useAsyncWriteThread;
- }
-
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java Wed Jul 7 03:24:02 2010
@@ -25,9 +25,6 @@ import java.util.concurrent.atomic.Atomi
import org.apache.activemq.Service;
import org.apache.activemq.apollo.Connection;
-import org.apache.activemq.dispatch.Dispatcher;
-import org.apache.activemq.dispatch.DispatcherConfig;
-import org.apache.activemq.dispatch.DispatcherAware;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener;
import org.apache.activemq.transport.TransportServer;
@@ -35,6 +32,8 @@ import org.apache.activemq.util.IOHelper
import org.apache.activemq.util.buffer.AsciiBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
public class Broker implements Service {
@@ -42,16 +41,14 @@ public class Broker implements Service {
static final private Log LOG = LogFactory.getLog(Broker.class);
- public static final int MAX_USER_PRIORITY = 10;
- public static final int MAX_PRIORITY = MAX_USER_PRIORITY + 1;
-
private final ArrayList<Connection> clientConnections = new ArrayList<Connection>();
private final ArrayList<TransportServer> transportServers = new ArrayList<TransportServer>();
private final ArrayList<String> connectUris = new ArrayList<String>();
private final LinkedHashMap<AsciiBuffer, VirtualHost> virtualHosts = new LinkedHashMap<AsciiBuffer, VirtualHost>();
private VirtualHost defaultVirtualHost;
- private Dispatcher dispatcher;
+
+ private DispatchQueue dispatchQueue = Dispatch.createQueue("broker");
private File dataDirectory;
private final class BrokerAcceptListener implements TransportAcceptListener {
@@ -59,8 +56,6 @@ public class Broker implements Service {
BrokerConnection connection = new BrokerConnection();
connection.setBroker(Broker.this);
connection.setTransport(transport);
- connection.setPriorityLevels(MAX_PRIORITY);
- connection.setDispatcher(dispatcher);
clientConnections.add(connection);
try {
connection.start();
@@ -125,21 +120,12 @@ public class Broker implements Service {
// Create the default virtual host if not explicitly defined.
getDefaultVirtualHost();
- // Don't change the state to STARTING yet as we may need to
- // apply some default configuration to this broker instance before it's started.
- if( dispatcher == null ) {
- int threads = Runtime.getRuntime().availableProcessors();
- dispatcher = DispatcherConfig.create(getName(), threads);
- }
-
// Ok now we are ready to start the broker up....
if ( !state.compareAndSet(State.CONFIGURATION, State.STARTING) ) {
throw new IllegalStateException("Can only start a broker that is in the "+State.CONFIGURATION +" state. Broker was "+state.get());
}
try {
- dispatcher.resume();
-
synchronized(virtualHosts) {
for (VirtualHost virtualHost : virtualHosts.values()) {
virtualHost.setBroker(this);
@@ -184,8 +170,6 @@ public class Broker implements Service {
for (VirtualHost virtualHost : virtualHosts.values()) {
stop(virtualHost);
}
-
- dispatcher.release();
state.set(State.STOPPED);
}
@@ -374,17 +358,6 @@ public class Broker implements Service {
}
// /////////////////////////////////////////////////////////////////
- // Property Accessors
- // /////////////////////////////////////////////////////////////////
- public Dispatcher getDispatcher() {
- return dispatcher;
- }
- public void setDispatcher(Dispatcher dispatcher) {
- assertInConfigurationState();
- this.dispatcher = dispatcher;
- }
-
- // /////////////////////////////////////////////////////////////////
// Helper Methods
// /////////////////////////////////////////////////////////////////
@@ -416,10 +389,8 @@ public class Broker implements Service {
}
private void startTransportServer(TransportServer server) throws Exception {
+ server.setDispatchQueue(dispatchQueue);
server.setAcceptListener(new BrokerAcceptListener());
- if (server instanceof DispatcherAware ) {
- ((DispatcherAware) server).setDispatcher(dispatcher);
- }
server.start();
}
@@ -449,6 +420,9 @@ public class Broker implements Service {
}
}
+ public DispatchQueue getDispatchQueue() {
+ return dispatchQueue;
+ }
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java Wed Jul 7 03:24:02 2010
@@ -16,40 +16,12 @@
*/
package org.apache.activemq.apollo.broker;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
import org.apache.activemq.Service;
+import org.apache.activemq.broker.store.QueueDescriptor;
import org.apache.activemq.broker.store.Store;
-import org.apache.activemq.broker.store.Store.Callback;
-import org.apache.activemq.broker.store.Store.FatalStoreException;
-import org.apache.activemq.broker.store.Store.KeyNotFoundException;
-import org.apache.activemq.broker.store.Store.MessageRecord;
-import org.apache.activemq.broker.store.Store.QueueQueryResult;
-import org.apache.activemq.broker.store.Store.QueueRecord;
-import org.apache.activemq.broker.store.Store.Session;
-import org.apache.activemq.dispatch.Dispatcher;
-import org.apache.activemq.dispatch.DispatcherAware;
-import org.apache.activemq.flow.AbstractLimitedFlowResource;
-import org.apache.activemq.flow.Flow;
-import org.apache.activemq.flow.FlowController;
-import org.apache.activemq.flow.IFlowResource;
-import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.flow.SizeLimiter;
+import org.apache.activemq.broker.store.Store.*;
+import org.apache.activemq.flow.*;
import org.apache.activemq.flow.ISinkController.FlowControllable;
-import org.apache.activemq.queue.QueueDescriptor;
import org.apache.activemq.queue.RestoreListener;
import org.apache.activemq.queue.RestoredElement;
import org.apache.activemq.queue.SaveableQueueElement;
@@ -59,8 +31,16 @@ import org.apache.activemq.util.buffer.A
import org.apache.activemq.util.buffer.Buffer;
import org.apache.activemq.util.list.LinkedNode;
import org.apache.activemq.util.list.LinkedNodeList;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
-public class BrokerDatabase extends AbstractLimitedFlowResource<BrokerDatabase.OperationBase<?>> implements Service, DispatcherAware {
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class BrokerDatabase extends AbstractLimitedFlowResource<BrokerDatabase.OperationBase<?>> implements Service {
private static final boolean DEBUG = false;
@@ -71,7 +51,7 @@ public class BrokerDatabase extends Abst
private final FlowController<OperationBase<?>> storeController;
private final int FLUSH_QUEUE_SIZE = 10000 * 1024;
- private Dispatcher dispatcher;
+ private DispatchQueue dispatcher;
private Thread flushThread;
private AtomicBoolean running = new AtomicBoolean(false);
private DatabaseListener listener;
@@ -328,7 +308,7 @@ public class BrokerDatabase extends Abst
if (requestedDelayedFlushPointer == -1) {
requestedDelayedFlushPointer = delayedFlushPointer;
- dispatcher.getGlobalQueue().dispatchAfter(flushDelayCallback, flushDelay, TimeUnit.MILLISECONDS);
+ Dispatch.getGlobalQueue().dispatchAfter(flushDelay, TimeUnit.MILLISECONDS, flushDelayCallback);
}
}
@@ -566,10 +546,7 @@ public class BrokerDatabase extends Abst
/**
* Deletes the given message from the store for the given queue.
*
- * @param storeTracking
- * The tracking number of the element being deleted
- * @param queue
- * The queue.
+ * @param queueElement
* @return The {@link OperationContext} associated with the operation
*/
public OperationContext<?> deleteQueueElement(SaveableQueueElement<?> queueElement) {
@@ -578,7 +555,7 @@ public class BrokerDatabase extends Abst
/**
* Loads a batch of messages for the specified queue. The loaded messages
- * are given the provided {@link MessageRestoreListener}.
+ * are given the provided {@link RestoreListener}.
* <p>
* <b><i>NOTE:</i></b> This method uses the queue sequence number for the
* message not the store tracking number.
@@ -592,7 +569,7 @@ public class BrokerDatabase extends Abst
* begining)
* @param maxSequence
* The maximum sequence number to load (-1 if no limit)
- * @param max
+ * @param maxCount
* The maximum number of messages to load (-1 if no limit)
* @param listener
* The listener to which messags should be passed.
@@ -632,7 +609,7 @@ public class BrokerDatabase extends Abst
/**
* This interface is used to execute transacted code.
*
- * It is used by the {@link Store#execute(Callback)} method, often as
+ * It is used by the {@link Store#execute(org.apache.activemq.broker.store.Store.Callback, Runnable)} method, often as
* anonymous class.
*/
public interface Operation<V> extends OperationContext<V> {
@@ -646,8 +623,7 @@ public class BrokerDatabase extends Abst
public boolean beginExecute();
/**
- * Gets called by the
- * {@link Store#add(Operation, ISourceController, boolean)} method
+ * Gets called by the {@link Store}
* within a transactional context. If any exception is thrown including
* Runtime exception, the transaction is rolled back.
*
@@ -760,7 +736,7 @@ public class BrokerDatabase extends Abst
/**
* Gets called by the
- * {@link Store#add(Operation, ISourceController, boolean)} method
+ * {@link Store} method
* within a transactional context. If any exception is thrown including
* Runtime exception, the transaction is rolled back.
*
@@ -867,7 +843,7 @@ public class BrokerDatabase extends Abst
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
- * @param unit the time unit of the timeout argument
+ * @param tu the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
@@ -1288,12 +1264,8 @@ public class BrokerDatabase extends Abst
return store.allocateStoreTracking();
}
- public Dispatcher getDispatcher() {
- return dispatcher;
- }
-
- public void setDispatcher(Dispatcher dispatcher) {
- this.dispatcher = dispatcher;
+ public void setDispatchQueue(DispatchQueue queue) {
+ this.dispatcher = queue;
}
public Store getStore() {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java Wed Jul 7 03:24:02 2010
@@ -22,9 +22,9 @@ import java.util.Collection;
import java.util.HashMap;
import org.apache.activemq.apollo.broker.BrokerDatabase.OperationContext;
+import org.apache.activemq.broker.store.QueueDescriptor;
import org.apache.activemq.broker.store.Store.MessageRecord;
import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.queue.QueueDescriptor;
import org.apache.activemq.queue.SaveableQueueElement;
import org.apache.activemq.util.buffer.AsciiBuffer;
import org.apache.activemq.util.buffer.Buffer;
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java Wed Jul 7 03:24:02 2010
@@ -22,9 +22,9 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
+import org.apache.activemq.broker.store.QueueDescriptor;
import org.apache.activemq.broker.store.Store.MessageRecord;
import org.apache.activemq.broker.store.Store.QueueQueryResult;
-import org.apache.activemq.dispatch.Dispatcher;
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.flow.PrioritySizeLimiter;
import org.apache.activemq.flow.SizeLimiter;
@@ -33,7 +33,6 @@ import org.apache.activemq.queue.IPartit
import org.apache.activemq.queue.IQueue;
import org.apache.activemq.queue.PartitionedQueue;
import org.apache.activemq.queue.PersistencePolicy;
-import org.apache.activemq.queue.QueueDescriptor;
import org.apache.activemq.queue.QueueStore;
import org.apache.activemq.queue.RestoreListener;
import org.apache.activemq.queue.SaveableQueueElement;
@@ -44,6 +43,7 @@ import org.apache.activemq.util.Mapper;
import org.apache.activemq.util.buffer.AsciiBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.fusesource.hawtdispatch.DispatchQueue;
public class BrokerQueueStore implements QueueStore<Long, MessageDelivery> {
@@ -52,7 +52,7 @@ public class BrokerQueueStore implements
private static final boolean USE_PRIORITY_QUEUES = true;
private BrokerDatabase database;
- private Dispatcher dispatcher;
+ private DispatchQueue dispatchQueue;
private static HashMap<String, ProtocolHandler> protocolHandlers = new HashMap<String, ProtocolHandler>();
private static final BrokerDatabase.MessageRecordMarshaller<MessageDelivery> MESSAGE_MARSHALLER = new BrokerDatabase.MessageRecordMarshaller<MessageDelivery>() {
@@ -226,8 +226,8 @@ public class BrokerQueueStore implements
this.database = database;
}
- public void setDispatcher(Dispatcher dispatcher) {
- this.dispatcher = dispatcher;
+ public void setDispatchQueue(DispatchQueue dispatchQueue) {
+ this.dispatchQueue = dispatchQueue;
}
public void loadQueues() throws Exception {
@@ -380,7 +380,6 @@ public class BrokerQueueStore implements
}
};
queue = new ExclusivePersistentQueue<Long, MessageDelivery>(name, limiter);
- queue.setDispatcher(dispatcher);
queue.setStore(this);
queue.setPersistencePolicy(DURABLE_QUEUE_PERSISTENCE_POLICY);
queue.setExpirationMapper(EXPIRATION_MAPPER);
@@ -409,7 +408,7 @@ public class BrokerQueueStore implements
break;
}
case QueueDescriptor.SHARED_PRIORITY: {
- PrioritySizeLimiter<MessageDelivery> limiter = new PrioritySizeLimiter<MessageDelivery>(DEFAULT_SHARED_QUEUE_SIZE, DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD, Broker.MAX_PRIORITY);
+ PrioritySizeLimiter<MessageDelivery> limiter = new PrioritySizeLimiter<MessageDelivery>(DEFAULT_SHARED_QUEUE_SIZE, DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD, 10);
limiter.setPriorityMapper(PRIORITY_MAPPER);
limiter.setSizeMapper(SIZE_MAPPER);
SharedPriorityQueue<Long, MessageDelivery> queue = new SharedPriorityQueue<Long, MessageDelivery>(name, limiter);
@@ -444,7 +443,6 @@ public class BrokerQueueStore implements
}
}
ret.getDescriptor().setApplicationType(SUBPARTITION_TYPE);
- ret.setDispatcher(dispatcher);
ret.setStore(this);
ret.setPersistencePolicy(SHARED_QUEUE_PERSISTENCE_POLICY);
ret.setExpirationMapper(EXPIRATION_MAPPER);
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java Wed Jul 7 03:24:02 2010
@@ -98,7 +98,6 @@ class TopicSubscription implements Broke
String name = subscription.getResourceName();
IFlowLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(100, 50);
ExclusiveQueue<MessageDelivery> queue = new ExclusiveQueue<MessageDelivery>(flow, name, limiter);
- queue.setDispatcher(host.getBroker().getDispatcher());
queue.setDrain( new QueueDispatchTarget<MessageDelivery>() {
public void drain(MessageDelivery elem, ISourceController<MessageDelivery> controller) {
subscription.add(elem, controller);
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java Wed Jul 7 03:24:02 2010
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.Atomi
import org.apache.activemq.apollo.broker.BrokerDatabase.OperationContext;
import org.apache.activemq.apollo.broker.Transaction.TxOp;
+import org.apache.activemq.broker.store.QueueDescriptor;
import org.apache.activemq.broker.store.Store.MessageRecord;
import org.apache.activemq.broker.store.Store.QueueQueryResult;
import org.apache.activemq.flow.ISourceController;
@@ -31,7 +32,6 @@ import org.apache.activemq.flow.SizeLimi
import org.apache.activemq.queue.ExclusivePersistentQueue;
import org.apache.activemq.queue.IQueue;
import org.apache.activemq.queue.PersistencePolicy;
-import org.apache.activemq.queue.QueueDescriptor;
import org.apache.activemq.queue.QueueStore;
import org.apache.activemq.queue.RestoreListener;
import org.apache.activemq.queue.SaveableQueueElement;
@@ -314,7 +314,6 @@ public class TransactionManager {
}
};
queue = new ExclusivePersistentQueue<Long, TxOp>(name, limiter);
- queue.setDispatcher(host.getBroker().getDispatcher());
queue.setStore(txStore);
queue.setPersistencePolicy(DEFAULT_TX_QUEUE_PERSISTENCE_POLICY);
queue.setExpirationMapper(EXPIRATION_MAPPER);
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java Wed Jul 7 03:24:02 2010
@@ -31,6 +31,8 @@ import org.apache.activemq.dispatch.inte
import org.apache.activemq.queue.IQueue;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.buffer.AsciiBuffer;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
/**
* @author chirino
@@ -48,6 +50,8 @@ public class VirtualHost implements Serv
private BrokerDatabase database;
private TransactionManager txnManager;
+ private DispatchQueue dispatchQueue = Dispatch.createQueue("virtual-host");
+
public VirtualHost() {
this.router.setVirtualHost(this);
}
@@ -117,14 +121,14 @@ public class VirtualHost implements Serv
database = new BrokerDatabase(store);
}
- database.setDispatcher(broker.getDispatcher());
+ database.setDispatchQueue(broker.getDispatchQueue());
database.start();
router.setDatabase(database);
//Recover queues:
queueStore.setDatabase(database);
- queueStore.setDispatcher(broker.getDispatcher());
+ queueStore.setDispatchQueue(dispatchQueue);
queueStore.loadQueues();
// Create Queue instances
@@ -277,14 +281,14 @@ public class VirtualHost implements Serv
/**
* A destination has bean created
*
- * @param destination
+ * @param queue
*/
public void onCreate(Queue queue);
/**
* A destination has bean destroyed
*
- * @param destination
+ * @param queue
*/
public void onDestroy(Queue queue);
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java Wed Jul 7 03:24:02 2010
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -29,14 +30,18 @@ import org.apache.activemq.apollo.broker
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer;
-import org.apache.activemq.transport.pipe.Pipe;
+import org.apache.activemq.transport.pipe.PipeTransport;
import org.apache.activemq.transport.pipe.PipeTransportFactory;
+import org.apache.activemq.transport.pipe.PipeTransportServer;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.URISupport;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import static org.apache.activemq.transport.TransportFactorySupport.configure;
+import static org.apache.activemq.transport.TransportFactorySupport.verify;
+
/**
* Implements the vm transport which behaves like the pipe transport except that
* it can start embedded brokers up on demand.
@@ -46,7 +51,8 @@ import org.apache.commons.logging.LogFac
*/
public class VMTransportFactory extends PipeTransportFactory {
static final private Log LOG = LogFactory.getLog(VMTransportFactory.class);
-
+ private static final String DEFAULT_PIPE_NAME = Broker.DEFAULT_VIRTUAL_HOST_NAME.toString();
+
/**
* This extension of the PipeTransportServer shuts down the broker
* when all the connections are disconnected.
@@ -58,10 +64,9 @@ public class VMTransportFactory extends
private Broker broker;
@Override
- protected PipeTransport createClientTransport(Pipe<Object> pipe) {
+ protected PipeTransport createClientTransport() {
refs.incrementAndGet();
-
- return new PipeTransport(pipe) {
+ return new PipeTransport(this) {
AtomicBoolean stopped = new AtomicBoolean();
@Override
public void stop() throws Exception {
@@ -90,20 +95,13 @@ public class VMTransportFactory extends
}
}
-
- private static final String DEFAULT_PIPE_NAME = Broker.DEFAULT_VIRTUAL_HOST_NAME.toString();
-
- @Override
- public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
- // Wishing right now the options would have been passed to the createTransport(URI location, WireFormat wf) method so we did don't
- // need to remove these here.
- options.remove("create");
- options.remove("broker");
- return super.compositeConfigure(transport, format, options);
+ @Override
+ public TransportServer bind(URI uri) {
+ return new VmTransportServer();
}
-
- @Override
- protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
+
+ @Override
+ public Transport connect(URI location) throws IOException {
try {
String brokerURI = null;
@@ -147,7 +145,6 @@ public class VMTransportFactory extends
// We want to use a vm transport server impl.
VmTransportServer vmTransportServer = (VmTransportServer) TransportFactory.bind(new URI("vm://" + name+"?wireFormat=null"));
vmTransportServer.setBroker(broker);
- vmTransportServer.setWireFormatFactory(wf.getWireFormatFactory());
broker.addTransportServer(vmTransportServer);
broker.start();
@@ -159,9 +156,8 @@ public class VMTransportFactory extends
}
PipeTransport transport = server.connect();
- transport.setWireFormat(wf);
- return transport;
-
+ return verify( configure(transport, options), options);
+
} catch (URISyntaxException e) {
throw IOExceptionSupport.create(e);
} catch (Exception e) {
@@ -169,10 +165,4 @@ public class VMTransportFactory extends
}
}
-
- @Override
- protected PipeTransportServer createTransportServer() {
- return new VmTransportServer();
- }
-
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java Wed Jul 7 03:24:02 2010
@@ -36,7 +36,7 @@ public class VMTransportTest {
@Test()
public void autoCreateBroker() throws Exception {
- Transport connect = TransportFactory.compositeConnect(new URI("vm://test1?wireFormat=mock"));
+ Transport connect = TransportFactory.connect(new URI("vm://test1?wireFormat=mock"));
connect.start();
assertNotNull(connect);
connect.stop();
@@ -44,12 +44,12 @@ public class VMTransportTest {
@Test(expected=IOException.class)
public void noAutoCreateBroker() throws Exception {
- TransportFactory.compositeConnect(new URI("vm://test2?create=false&wireFormat=mock"));
+ TransportFactory.connect(new URI("vm://test2?create=false&wireFormat=mock"));
}
@Test(expected=IllegalArgumentException.class)
public void badOptions() throws Exception {
- TransportFactory.compositeConnect(new URI("vm://test3?crazy-option=false&wireFormat=mock"));
+ TransportFactory.connect(new URI("vm://test3?crazy-option=false&wireFormat=mock"));
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Wed Jul 7 03:24:02 2010
@@ -476,7 +476,6 @@ public abstract class BrokerTestBase {
consumer.setDestination(destination);
consumer.setName("consumer" + (i + 1));
consumer.setTotalConsumerRate(totalConsumerRate);
- consumer.setDispatcher(dispatcher);
return consumer;
}
@@ -498,7 +497,6 @@ public abstract class BrokerTestBase {
producer.setDestination(destination);
producer.setMessageIdGenerator(msgIdGenerator);
producer.setTotalProducerRate(totalProducerRate);
- producer.setDispatcher(dispatcher);
return producer;
}
@@ -508,7 +506,6 @@ public abstract class BrokerTestBase {
Broker broker = new Broker();
broker.addTransportServer(TransportFactory.bind(new URI(bindURI)));
broker.addConnectUri(connectUri);
- broker.setDispatcher(dispatcher);
broker.getDefaultVirtualHost().setStore(createStore(broker));
return broker;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteConsumer.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteConsumer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteConsumer.java Wed Jul 7 03:24:02 2010
@@ -9,7 +9,6 @@ import org.apache.activemq.apollo.broker
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.metric.MetricAggregator;
import org.apache.activemq.metric.MetricCounter;
-import org.apache.activemq.transport.DispatchableTransport;
import org.apache.activemq.transport.TransportFactory;
abstract public class RemoteConsumer extends Connection {
@@ -29,11 +28,8 @@ abstract public class RemoteConsumer ext
consumerRate.name("Consumer " + name + " Rate");
totalConsumerRate.add(consumerRate);
- transport = TransportFactory.compositeConnect(uri);
- if(transport instanceof DispatchableTransport)
- {
- schedualWait = true;
- }
+ transport = TransportFactory.connect(uri);
+ schedualWait = true;
initialize();
super.start();
setupSubscription();
@@ -46,12 +42,12 @@ abstract public class RemoteConsumer ext
protected void messageReceived(final ISourceController<MessageDelivery> controller, final MessageDelivery elem) {
if( schedualWait ) {
if (thinkTime > 0) {
- getDispatcher().getGlobalQueue().dispatchAfter(new Runnable(){
+ dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, new Runnable(){
public void run() {
consumerRate.increment();
controller.elementDispatched(elem);
}
- }, thinkTime, TimeUnit.MILLISECONDS);
+ });
}
else
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java Wed Jul 7 03:24:02 2010
@@ -31,7 +31,6 @@ abstract public class RemoteProducer ext
protected String property;
protected MetricAggregator totalProducerRate;
protected MessageDelivery next;
- protected DispatchQueue dispatchQueue;
protected Runnable dispatchTask;
protected String filler;
protected int payloadSize = 20;
@@ -55,13 +54,12 @@ abstract public class RemoteProducer ext
totalProducerRate.add(rate);
- transport = TransportFactory.compositeConnect(uri);
+ transport = TransportFactory.connect(uri);
initialize();
super.start();
setupProducer();
- dispatchQueue = getDispatcher().createSerialQueue(name + "-client", STICK_TO_CALLER_THREAD);
dispatchTask = new Runnable(){
public void run() {
dispatch();
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java Wed Jul 7 03:24:02 2010
@@ -26,9 +26,9 @@ import org.apache.activemq.apollo.broker
import org.apache.activemq.apollo.broker.MessageDelivery;
import org.apache.activemq.broker.store.Store;
import org.apache.activemq.broker.store.StoreFactory;
-import org.apache.activemq.dispatch.Dispatcher;
-import org.apache.activemq.dispatch.DispatcherConfig;
import org.apache.activemq.queue.IQueue;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
/**
* @author cmacnaug
@@ -37,7 +37,7 @@ import org.apache.activemq.queue.IQueue;
public class SharedQueueTest extends TestCase {
- Dispatcher dispatcher;
+ DispatchQueue dispatchQueue;
BrokerDatabase database;
BrokerQueueStore queueStore;
private static final boolean USE_KAHA_DB = true;
@@ -46,8 +46,8 @@ public class SharedQueueTest extends Tes
protected ArrayList<IQueue<Long, MessageDelivery>> queues = new ArrayList<IQueue<Long, MessageDelivery>>();
- protected Dispatcher createDispatcher() {
- return DispatcherConfig.create("test", Runtime.getRuntime().availableProcessors());
+ protected DispatchQueue createDispatcher() {
+ return Dispatch.createQueue();
}
protected int consumerStartDelay = 0;
@@ -63,20 +63,19 @@ public class SharedQueueTest extends Tes
}
protected void startServices() throws Exception {
- dispatcher = createDispatcher();
- dispatcher.resume();
+ dispatchQueue = createDispatcher();
+ dispatchQueue.resume();
database = new BrokerDatabase(createStore());
- database.setDispatcher(dispatcher);
+ database.setDispatchQueue(dispatchQueue);
database.start();
queueStore = new BrokerQueueStore();
queueStore.setDatabase(database);
- queueStore.setDispatcher(dispatcher);
queueStore.loadQueues();
}
protected void stopServices() throws Exception {
database.stop();
- dispatcher.release();
+ dispatchQueue.release();
queues.clear();
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java Wed Jul 7 03:24:02 2010
@@ -89,6 +89,7 @@ import org.apache.activemq.management.St
import org.apache.activemq.management.StatsImpl;
import org.apache.activemq.state.CommandVisitorAdapter;
import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.IdGenerator;
@@ -148,7 +149,7 @@ public class ActiveMQConnection implemen
private int sendTimeout =0;
private boolean sendAcksAsync=true;
- private final Transport transport;
+ private final ResponseCorrelator transport;
private final IdGenerator clientIdGenerator;
private final JMSStatsImpl factoryStats;
private final JMSConnectionStatsImpl stats;
@@ -196,7 +197,7 @@ public class ActiveMQConnection implemen
*/
protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception {
- this.transport = transport;
+ this.transport = new ResponseCorrelator(transport);
this.clientIdGenerator = clientIdGenerator;
this.factoryStats = factoryStats;
@@ -1221,11 +1222,11 @@ public class ActiveMQConnection implemen
}
private void doAsyncSendPacket(Command command) throws JMSException {
- try {
+// try {
this.transport.oneway(command);
- } catch (IOException e) {
- throw JMSExceptionSupport.create(e);
- }
+// } catch (IOException e) {
+// throw JMSExceptionSupport.create(e);
+// }
}
/**
@@ -1820,21 +1821,21 @@ public class ActiveMQConnection implemen
}
}
- public void transportInterupted() {
+ public void onDisconnected() {
for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
s.clearMessagesInProgress();
}
for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
TransportListener listener = iter.next();
- listener.transportInterupted();
+ listener.onDisconnected();
}
}
- public void transportResumed() {
+ public void onConnected() {
for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
TransportListener listener = iter.next();
- listener.transportResumed();
+ listener.onConnected();
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java Wed Jul 7 03:24:02 2010
@@ -238,7 +238,7 @@ public class ActiveMQConnectionFactory e
*/
protected Transport createTransport() throws JMSException {
try {
- return TransportFactory.connect(brokerURL, DEFAULT_CONNECTION_EXECUTOR);
+ return TransportFactory.connect(brokerURL);
} catch (Exception e) {
throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java Wed Jul 7 03:24:02 2010
@@ -43,15 +43,10 @@ public class BrokerXml {
private List<String> transportServers = new ArrayList<String>();
@XmlElement(name="connect-uri")
private List<String> connectUris = new ArrayList<String>();
- @XmlElement(required = false)
- private DispatcherXml dispatcher;
public Broker createMessageBroker() throws Exception {
Broker rc = new Broker();
- if( dispatcher!=null ) {
- rc.setDispatcher(dispatcher.createDispatcher(this));
- }
for (VirtualHostXml element : virtualHosts) {
rc.addVirtualHost(element.createVirtualHost(this));
}
@@ -98,13 +93,4 @@ public class BrokerXml {
this.connectUris = connectUris;
}
-
- public DispatcherXml getDispatcher() {
- return dispatcher;
- }
- public void setDispatcher(DispatcherXml dispatcher) {
- this.dispatcher = dispatcher;
- }
-
-
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java Wed Jul 7 03:24:02 2010
@@ -44,7 +44,6 @@ public class JAXBConfigTest extends Test
LOG.info("Loading broker configuration from the classpath with URI: " + uri);
Broker broker = BrokerFactory.createBroker(uri);
- Dispatcher p = (Dispatcher)broker.getDispatcher();
// assertEquals(4, p.getSize());
// assertEquals("test dispatcher", p.getName());
Modified: activemq/sandbox/activemq-apollo-actor/activemq-kaha/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-kaha/pom.xml?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-kaha/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-kaha/pom.xml Wed Jul 7 03:24:02 2010
@@ -25,11 +25,11 @@
</parent>
<groupId>org.apache.activemq</groupId>
- <artifactId>activemq-kaha</artifactId>
+ <artifactId>activemq-hawtdb</artifactId>
<packaging>jar</packaging>
<version>6.0-SNAPSHOT</version>
- <name>ActiveMQ :: ${artifactId} :: ${version}</name>
+ <name>ActiveMQ :: HawtDB</name>
<dependencies>
@@ -40,8 +40,9 @@
</dependency>
<dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>kahadb</artifactId>
+ <groupId>org.fusesource.hawtdb</groupId>
+ <artifactId>hawtdb</artifactId>
+ <version>${hawtdb-version}</version>
<optional>false</optional>
</dependency>