You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:24:44 UTC
svn commit: r961062 [2/14] - in /activemq/sandbox/activemq-apollo-actor: ./
activemq-all/ activemq-all/src/test/ide-resources/
activemq-all/src/test/java/org/apache/activemq/jaxb/
activemq-all/src/test/java/org/apache/activemq/legacy/ activemq-all/src/...
Modified: activemq/sandbox/activemq-apollo-actor/activemq-bio/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-bio/pom.xml?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-bio/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-bio/pom.xml Wed Jul 7 03:24:02 2010
@@ -25,11 +25,11 @@
</parent>
<groupId>org.apache.activemq</groupId>
- <artifactId>activemq-bio</artifactId>
+ <artifactId>activemq-nio</artifactId>
<packaging>jar</packaging>
<version>6.0-SNAPSHOT</version>
- <name>ActiveMQ :: BIO</name>
+ <name>ActiveMQ :: NIO</name>
<dependencies>
@@ -37,6 +37,7 @@
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-transport</artifactId>
</dependency>
+
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
@@ -48,24 +49,6 @@
<scope>test</scope>
</dependency>
- <!-- In case we want to look at mina..
- <dependency>
- <groupId>org.apache.mina</groupId>
- <artifactId>mina-core</artifactId>
- <version>2.0.0-M4</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-jcl</artifactId>
- </dependency>
--->
-
- <!-- Testing Dependencies -->
-
</dependencies>
</project>
Modified: activemq/sandbox/activemq-apollo-actor/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul 7 03:24:02 2010
@@ -16,398 +16,152 @@
*/
package org.apache.activemq.transport.tcp;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import org.apache.activemq.transport.CompletionCallback;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.activemq.util.buffer.ByteArrayOutputStream;
+import org.apache.activemq.wireformat.WireFormat;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.DispatchSource;
+
import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.net.URI;
-import java.net.UnknownHostException;
-import java.util.HashMap;
+import java.net.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.LinkedList;
import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.net.SocketFactory;
-
-import org.apache.activemq.Service;
-import org.apache.activemq.transport.Transport; //import org.apache.activemq.transport.TransportLoggerFactory;
-import org.apache.activemq.transport.TransportThreadSupport;
-import org.apache.activemq.util.IntrospectionSupport;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.wireformat.WireFormat;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.apache.activemq.transport.tcp.TcpTransport.SocketState.*;
+import static org.apache.activemq.transport.tcp.TcpTransport.TransportState.*;
/**
* An implementation of the {@link Transport} interface using raw tcp/ip
*
- * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging
- * improvement modifications)
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
- private static final Log LOG = LogFactory.getLog(TcpTransport.class);
- private static final ThreadPoolExecutor SOCKET_CLOSE;
- protected final URI remoteLocation;
- protected final URI localLocation;
- protected final WireFormat wireFormat;
-
- protected int connectionTimeout = 30000;
- protected int soTimeout;
- protected int socketBufferSize = 64 * 1024;
- protected int ioBufferSize = 8 * 1024;
- protected boolean closeAsync = true;
- protected Socket socket;
- protected DataOutputStream dataOut;
- protected DataInputStream dataIn;
- protected TcpBufferedOutputStream buffOut = null;
-
- private static final boolean ASYNC_WRITE = false;
- /**
- * trace=true -> the Transport stack where this TcpTransport object will be,
- * will have a TransportLogger layer trace=false -> the Transport stack
- * where this TcpTransport object will be, will NOT have a TransportLogger
- * layer, and therefore will never be able to print logging messages. This
- * parameter is most probably set in Connection or TransportConnector URIs.
- */
- protected boolean trace = false;
- /**
- * Name of the LogWriter implementation to use. Names are mapped to classes
- * in the
- * resources/META-INF/services/org/apache/activemq/transport/logwriters
- * directory. This parameter is most probably set in Connection or
- * TransportConnector URIs.
- */
- // protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
- /**
- * Specifies if the TransportLogger will be manageable by JMX or not. Also,
- * as long as there is at least 1 TransportLogger which is manageable, a
- * TransportLoggerControl MBean will me created.
- */
- protected boolean dynamicManagement = false;
- /**
- * startLogging=true -> the TransportLogger object of the Transport stack
- * will initially write messages to the log. startLogging=false -> the
- * TransportLogger object of the Transport stack will initially NOT write
- * messages to the log. This parameter only has an effect if trace == true.
- * This parameter is most probably set in Connection or TransportConnector
- * URIs.
- */
- protected boolean startLogging = true;
- /**
- * Specifies the port that will be used by the JMX server to manage the
- * TransportLoggers. This should only be set in an URI by a client (producer
- * or consumer) since a broker will already create a JMX server. It is
- * useful for people who test a broker and clients in the same machine and
- * want to control both via JMX; a different port will be needed.
- */
- protected int jmxPort = 1099;
- protected boolean useLocalHost = true;
- protected int minmumWireFormatVersion;
- protected SocketFactory socketFactory;
- protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>();
-
+public class TcpTransport implements Transport {
private Map<String, Object> socketOptions;
- private Boolean keepAlive;
- private Boolean tcpNoDelay;
- private Thread runnerThread;
-
- protected boolean useActivityMonitor;
-
- /**
- * Connect to a remote Node - e.g. a Broker
- *
- * @param wireFormat
- * @param socketFactory
- * @param remoteLocation
- * @param localLocation
- * - e.g. local InetAddress and local port
- * @throws IOException
- * @throws UnknownHostException
- */
- public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
- this.wireFormat = wireFormat;
- this.socketFactory = socketFactory;
- try {
- this.socket = socketFactory.createSocket();
- } catch (SocketException e) {
- this.socket = null;
- }
- this.remoteLocation = remoteLocation;
- this.localLocation = localLocation;
- setDaemon(false);
- }
-
- /**
- * Initialize from a server Socket
- *
- * @param wireFormat
- * @param socket
- * @throws IOException
- */
- public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
- this.wireFormat = wireFormat;
- this.socket = socket;
- this.remoteLocation = null;
- this.localLocation = null;
- setDaemon(true);
- }
-
- LinkedBlockingQueue<Object> outbound = new LinkedBlockingQueue<Object>();
- private Thread onewayThread;
-
- /**
- * A one way asynchronous send
- */
- public void oneway(Object command) throws IOException {
- checkStarted();
- try {
- if (ASYNC_WRITE) {
- outbound.put(command);
- } else {
- wireFormat.marshal(command, dataOut);
- dataOut.flush();
- }
- } catch (InterruptedException e) {
- throw new InterruptedIOException();
- }
- }
-
- protected void sendOneways() {
- try {
- LOG.debug("Started oneway thead");
- while (!isStopped()) {
- Object command = outbound.poll(500, TimeUnit.MILLISECONDS);
- if (command != null) {
- try {
- // int count=0;
- while (command != null) {
- wireFormat.marshal(command, dataOut);
- // count++;
- command = outbound.poll();
- }
- // System.out.println(count);
- dataOut.flush();
- } catch (IOException e) {
- getTransportListener().onException(e);
- }
- }
- }
- } catch (InterruptedException e) {
- }
- }
-
- /**
- * @return pretty print of 'this'
- */
- public String toString() {
- return "tcp://" + socket.getInetAddress() + ":" + socket.getPort();
- }
-
- /**
- * reads packets from a Socket
- */
- public void run() {
- LOG.trace("TCP consumer thread for " + this + " starting");
- this.runnerThread = Thread.currentThread();
- try {
- while (!isStopped()) {
- doRun();
- }
- } catch (IOException e) {
- stoppedLatch.get().countDown();
- onException(e);
- } catch (Throwable e) {
- stoppedLatch.get().countDown();
- IOException ioe = new IOException("Unexpected error occured");
- ioe.initCause(e);
- onException(ioe);
- } finally {
- stoppedLatch.get().countDown();
- }
- }
- protected void doRun() throws IOException {
- try {
- Object command = readCommand();
- doConsume(command);
- } catch (SocketTimeoutException e) {
- } catch (InterruptedIOException e) {
- }
+ enum SocketState {
+ CONNECTING,
+ CONNECTED,
+ DISCONNECTED
}
- protected Object readCommand() throws IOException {
- return wireFormat.unmarshal(dataIn);
+ enum TransportState {
+ CREATED,
+ RUNNING,
+ DISPOSED
}
- // Properties
- // -------------------------------------------------------------------------
+ protected URI remoteLocation;
+ protected URI localLocation;
+ private TransportListener listener;
+ private String remoteAddress;
+ private WireFormat wireformat;
- public boolean isTrace() {
- return trace;
- }
+ private SocketChannel channel;
- public void setTrace(boolean trace) {
- this.trace = trace;
- }
+ private SocketState socketState = DISCONNECTED;
+ private TransportState transportState = CREATED;
- void setUseInactivityMonitor(boolean val) {
- useActivityMonitor = val;
- }
+ private DispatchQueue dispatchQueue;
+ private DispatchSource readSource;
+ private DispatchSource writeSource;
- public boolean isUseInactivityMonitor() {
- return useActivityMonitor;
- }
-
- // public String getLogWriterName() {
- // return logWriterName;
- // }
- //
- // public void setLogWriterName(String logFormat) {
- // this.logWriterName = logFormat;
- // }
-
- public boolean isDynamicManagement() {
- return dynamicManagement;
- }
-
- public void setDynamicManagement(boolean useJmx) {
- this.dynamicManagement = useJmx;
- }
-
- public boolean isStartLogging() {
- return startLogging;
- }
-
- public void setStartLogging(boolean startLogging) {
- this.startLogging = startLogging;
- }
-
- public int getJmxPort() {
- return jmxPort;
- }
-
- public void setJmxPort(int jmxPort) {
- this.jmxPort = jmxPort;
- }
-
- public int getMinmumWireFormatVersion() {
- return minmumWireFormatVersion;
- }
-
- public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
- this.minmumWireFormatVersion = minmumWireFormatVersion;
- }
-
- public boolean isUseLocalHost() {
- return useLocalHost;
- }
-
- /**
- * Sets whether 'localhost' or the actual local host name should be used to
- * make local connections. On some operating systems such as Macs its not
- * possible to connect as the local host name so localhost is better.
- */
- public void setUseLocalHost(boolean useLocalHost) {
- this.useLocalHost = useLocalHost;
- }
-
- public int getSocketBufferSize() {
- return socketBufferSize;
- }
+ final LinkedList<OneWay> outbound = new LinkedList<OneWay>();
+ int maxOutbound = 1024*32;
+ ByteBuffer outbound_frame;
+ protected boolean useLocalHost = true;
- /**
- * Sets the buffer size to use on the socket
- */
- public void setSocketBufferSize(int socketBufferSize) {
- this.socketBufferSize = socketBufferSize;
- }
+ static final class OneWay {
+ final Buffer buffer;
+ final CompletionCallback callback;
- public int getSoTimeout() {
- return soTimeout;
+ public OneWay(Buffer buffer, CompletionCallback callback) {
+ this.callback = callback;
+ this.buffer = buffer;
+ }
}
- /**
- * Sets the socket timeout
- */
- public void setSoTimeout(int soTimeout) {
- this.soTimeout = soTimeout;
+ public void connected(SocketChannel channel) {
+ this.channel = channel;
+ this.socketState = CONNECTED;
}
- public int getConnectionTimeout() {
- return connectionTimeout;
+ public void connecting(URI remoteLocation, URI localLocation) throws IOException {
+ this.remoteLocation = remoteLocation;
+ this.localLocation = localLocation;
+ this.socketState = CONNECTING;
}
- /**
- * Sets the timeout used to connect to the socket
- */
- public void setConnectionTimeout(int connectionTimeout) {
- this.connectionTimeout = connectionTimeout;
- }
- public Boolean getKeepAlive() {
- return keepAlive;
+ public DispatchQueue getDispatchQueue() {
+ return dispatchQueue;
}
- /**
- * Enable/disable TCP KEEP_ALIVE mode
- */
- public void setKeepAlive(Boolean keepAlive) {
- this.keepAlive = keepAlive;
+ public void setDispatchQueue(DispatchQueue queue) {
+ if( dispatchQueue!=null ) {
+ dispatchQueue.release();
+ }
+ this.dispatchQueue = queue;
+ if( dispatchQueue!=null ) {
+ dispatchQueue.retain();
+ }
}
- public Boolean getTcpNoDelay() {
- return tcpNoDelay;
- }
+ public void start() throws Exception {
+ if (dispatchQueue == null) {
+ throw new IllegalArgumentException("dispatchQueue is not set");
+ }
+ if (listener == null) {
+ throw new IllegalArgumentException("listener is not set");
+ }
+ if( transportState!=CREATED ) {
+ throw new IllegalStateException("can only be started from the created stae");
+ }
+ transportState=RUNNING;
- /**
- * Enable/disable the TCP_NODELAY option on the socket
- */
- public void setTcpNoDelay(Boolean tcpNoDelay) {
- this.tcpNoDelay = tcpNoDelay;
- }
+ if( socketState == CONNECTING ) {
+ channel = SocketChannel.open();
+ }
+ channel.configureBlocking(false);
+ if( socketState == CONNECTING ) {
- /**
- * @return the ioBufferSize
- */
- public int getIoBufferSize() {
- return this.ioBufferSize;
- }
+ if (localLocation != null) {
+ InetSocketAddress localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
+ channel.socket().bind(localAddress);
+ }
- /**
- * @param ioBufferSize
- * the ioBufferSize to set
- */
- public void setIoBufferSize(int ioBufferSize) {
- this.ioBufferSize = ioBufferSize;
- }
+ String host = resolveHostName(remoteLocation.getHost());
+ InetSocketAddress remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
+ channel.connect(remoteAddress);
- /**
- * @return the closeAsync
- */
- public boolean isCloseAsync() {
- return closeAsync;
+ final DispatchSource connectSource = Dispatch.createSource(channel, SelectionKey.OP_CONNECT, dispatchQueue);
+ connectSource.setEventHandler(new Runnable() {
+ public void run() {
+ if( transportState==RUNNING ) {
+ try {
+ socketState = CONNECTED;
+ channel.finishConnect();
+ connectSource.release();
+ fireConnected();
+ } catch (IOException e) {
+ listener.onException(e);
+ }
+ }
+ }
+ });
+ connectSource.resume();
+ } else {
+ fireConnected();
+ }
}
- /**
- * @param closeAsync
- * the closeAsync to set
- */
- public void setCloseAsync(boolean closeAsync) {
- this.closeAsync = closeAsync;
- }
- // Implementation methods
- // -------------------------------------------------------------------------
protected String resolveHostName(String host) throws UnknownHostException {
String localName = InetAddress.getLocalHost().getHostName();
if (localName != null && isUseLocalHost()) {
@@ -418,201 +172,759 @@ public class TcpTransport extends Transp
return host;
}
- /**
- * Configures the socket for use
- *
- * @param sock
- * @throws SocketException
- */
- protected void initialiseSocket(Socket sock) throws SocketException {
- if (socketOptions != null) {
- IntrospectionSupport.setProperties(socket, socketOptions);
- }
+ private void fireConnected() {
try {
- sock.setReceiveBufferSize(socketBufferSize);
- sock.setSendBufferSize(socketBufferSize);
- } catch (SocketException se) {
- LOG.warn("Cannot set socket buffer size = " + socketBufferSize);
- LOG.debug("Cannot set socket buffer size. Reason: " + se, se);
+ channel.socket().setSendBufferSize(maxOutbound);
+ channel.socket().setReceiveBufferSize(maxOutbound);
+ } catch (SocketException e) {
}
- sock.setSoTimeout(soTimeout);
- if (keepAlive != null) {
- sock.setKeepAlive(keepAlive.booleanValue());
- }
- if (tcpNoDelay != null) {
- sock.setTcpNoDelay(tcpNoDelay.booleanValue());
- }
- }
+ readSource = Dispatch.createSource(channel, SelectionKey.OP_READ, dispatchQueue);
+ readSource.setEventHandler(new Runnable(){
+ public void run() {
+ drainInbound();
+ }
+ });
- protected void doStart() throws Exception {
- connect();
- if (ASYNC_WRITE) {
- onewayThread = new Thread() {
- @Override
- public void run() {
- sendOneways();
+ writeSource = Dispatch.createSource(channel, SelectionKey.OP_WRITE, dispatchQueue);
+ writeSource.setEventHandler(new Runnable(){
+ public void run() {
+ if( transportState==RUNNING ) {
+ // once the outbound is drained.. we can suspend getting
+ // write events.
+ if( drainOutbound() ) {
+ writeSource.suspend();
+ }
}
- };
- onewayThread.start();
- }
+ }
+ });
- stoppedLatch.set(new CountDownLatch(1));
- super.doStart();
+ remoteAddress = channel.socket().getRemoteSocketAddress().toString();
+ listener.onConnected();
+ readSource.resume();
}
- protected void connect() throws Exception {
-
- if (socket == null && socketFactory == null) {
- throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
- }
-
- InetSocketAddress localAddress = null;
- InetSocketAddress remoteAddress = null;
- if (localLocation != null) {
- localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
+ public void stop() throws Exception {
+ if( readSource!=null ) {
+ readSource.release();
+ readSource = null;
}
-
- if (remoteLocation != null) {
- String host = resolveHostName(remoteLocation.getHost());
- remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
+ if( writeSource!=null ) {
+ writeSource.release();
+ writeSource = null;
}
+ setDispatchQueue(null);
+ transportState=DISPOSED;
+ }
- if (socket != null) {
+ @Deprecated
+ public void oneway(Object command) {
+ oneway(command, null);
+ }
- if (localAddress != null) {
- socket.bind(localAddress);
+ public void oneway(Object command, CompletionCallback callback) {
+ try {
+ if( socketState != CONNECTED ) {
+ throw new IllegalStateException("Not connected.");
}
-
- // If it's a server accepted socket.. we don't need to connect it
- // to a remote address.
- if (remoteAddress != null) {
- if (connectionTimeout >= 0) {
- socket.connect(remoteAddress, connectionTimeout);
- } else {
- socket.connect(remoteAddress);
- }
+ } catch (IllegalStateException e) {
+ if( callback!=null ) {
+ callback.onFailure(e);
}
+ }
- } else {
- // For SSL sockets.. you can't create an unconnected socket :(
- // This means the timout option are not supported either.
- if (localAddress != null) {
- socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), localAddress.getAddress(), localAddress.getPort());
- } else {
- socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
- }
+ // Marshall the command.
+ Buffer buffer = null;
+ try {
+ buffer = wireformat.marshal(command);
+ } catch (IOException e) {
+ callback.onFailure(e);
+ return;
}
- initialiseSocket(socket);
- initializeStreams();
- }
+ outbound.add(new OneWay(buffer, callback));
- protected void doStop(ServiceStopper stopper) throws Exception {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Stopping transport " + this);
+ // wait for write ready events if this write
+ // cannot be drained.
+ if( outbound.size()==1 && !drainOutbound() ) {
+ writeSource.resume();
}
+ }
- // Closing the streams flush the sockets before closing.. if the socket
- // is hung.. then this hangs the close.
- // closeStreams();
- if (socket != null) {
- if (closeAsync) {
- // closing the socket can hang also
- final CountDownLatch latch = new CountDownLatch(1);
+ /**
+ * @retruns true if the outbound has been drained of all objects and there are no in progress writes.
+ */
+ private boolean drainOutbound() {
+ try {
+
+ while(socketState == CONNECTED) {
+
+ // if we have a pending write that is being sent over the socket...
+ if( outbound_frame!=null ) {
+
+ channel.write(outbound_frame);
+ if( outbound_frame.remaining() != 0 ) {
+ return false;
+ } else {
+ outbound_frame = null;
+ }
- SOCKET_CLOSE.execute(new Runnable() {
+ } else {
- public void run() {
- try {
- socket.close();
- } catch (IOException e) {
- LOG.debug("Caught exception closing socket", e);
- } finally {
- latch.countDown();
- }
+ // marshall all the available frames..
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream(maxOutbound << 2);
+ OneWay oneWay = outbound.poll();
+
+ while( oneWay!=null) {
+ buffer.write(oneWay.buffer);
+ if( oneWay.callback!=null ) {
+ oneWay.callback.onCompletion();
}
+ if( buffer.size() < maxOutbound ) {
+ oneWay = outbound.poll();
+ } else {
+ oneWay = null;
+ }
+ }
+
- });
- latch.await(1, TimeUnit.SECONDS);
- } else {
- try {
- socket.close();
- } catch (IOException e) {
- LOG.debug("Caught exception closing socket", e);
+ if( buffer.size()==0 ) {
+ // the source is now drained...
+ return true;
+ } else {
+ outbound_frame = buffer.toBuffer().toByteBuffer();
}
+ }
}
- if (ASYNC_WRITE) {
- onewayThread.join();
- }
+
+ } catch (IOException e) {
+ listener.onException(e);
}
+
+ return outbound.isEmpty() && outbound_frame==null;
}
- /**
- * Override so that stop() blocks until the run thread is no longer running.
- */
- @Override
- public void stop() throws Exception {
- super.stop();
- CountDownLatch countDownLatch = stoppedLatch.get();
- if (countDownLatch != null && Thread.currentThread() != this.runnerThread) {
- countDownLatch.await(1, TimeUnit.SECONDS);
+ private void drainInbound() {
+ Object command = null;
+ // the transport may be suspended after processing a command.
+ while( !readSource.isSuspended() && (command=wireformat.unmarshal(channel))!=null ) {
+ listener.onCommand(command);
}
}
- protected void initializeStreams() throws Exception {
- TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
- this.dataIn = new DataInputStream(buffIn);
- buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
- this.dataOut = new DataOutputStream(buffOut);
+
+ public String getRemoteAddress() {
+ return remoteAddress;
}
- protected void closeStreams() throws IOException {
- if (dataOut != null) {
- dataOut.close();
- }
- if (dataIn != null) {
- dataIn.close();
+ public <T> T narrow(Class<T> target) {
+ if (target.isAssignableFrom(getClass())) {
+ return target.cast(this);
}
+ return null;
}
- public void setSocketOptions(Map<String, Object> socketOptions) {
- this.socketOptions = new HashMap<String, Object>(socketOptions);
+ public void suspend() {
+ readSource.suspend();
}
- public String getRemoteAddress() {
- if (socket != null) {
- return "" + socket.getRemoteSocketAddress();
- }
- return null;
+ public void resume() {
+ readSource.resume();
+ }
+
+ public void reconnect(URI uri, CompletionCallback callback) {
+ throw new UnsupportedOperationException();
}
- @Override
- public <T> T narrow(Class<T> target) {
- if (target == Socket.class) {
- return target.cast(socket);
- } else if (target == TcpBufferedOutputStream.class) {
- return target.cast(buffOut);
- }
- return super.narrow(target);
+ public TransportListener getTransportListener() {
+ return listener;
+ }
+ public void setTransportListener(TransportListener listener) {
+ this.listener = listener;
}
- static {
- SOCKET_CLOSE = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
- public Thread newThread(Runnable runnable) {
- Thread thread = new Thread(runnable, "TcpSocketClose: " + runnable);
- thread.setPriority(Thread.MAX_PRIORITY);
- thread.setDaemon(true);
- return thread;
- }
- });
+ public WireFormat getWireformat() {
+ return wireformat;
+ }
+ public void setWireformat(WireFormat wireformat) {
+ this.wireformat = wireformat;
+ }
+
+ public boolean isConnected() {
+ return socketState == CONNECTED;
+ }
+
+ public boolean isDisposed() {
+ return transportState == DISPOSED;
+ }
+ public boolean isFaultTolerant() {
+ return false;
+ }
+
+ public void setSocketOptions(Map<String, Object> socketOptions) {
+ this.socketOptions = socketOptions;
}
- public WireFormat getWireformat()
- {
- return wireFormat;
+// private static final Log LOG = LogFactory.getLog(TcpTransport.class);
+// private static final ThreadPoolExecutor SOCKET_CLOSE;
+// protected final URI remoteLocation;
+// protected final URI localLocation;
+// protected final WireFormat wireFormat;
+//
+// protected int connectionTimeout = 30000;
+// protected int soTimeout;
+// protected int socketBufferSize = 64 * 1024;
+// protected int ioBufferSize = 8 * 1024;
+// protected boolean closeAsync = true;
+// protected Socket socket;
+// protected DataOutputStream dataOut;
+// protected DataInputStream dataIn;
+// protected TcpBufferedOutputStream buffOut = null;
+//
+// private static final boolean ASYNC_WRITE = false;
+// /**
+// * trace=true -> the Transport stack where this TcpTransport object will be,
+// * will have a TransportLogger layer trace=false -> the Transport stack
+// * where this TcpTransport object will be, will NOT have a TransportLogger
+// * layer, and therefore will never be able to print logging messages. This
+// * parameter is most probably set in Connection or TransportConnector URIs.
+// */
+// protected boolean trace = false;
+// /**
+// * Name of the LogWriter implementation to use. Names are mapped to classes
+// * in the
+// * resources/META-INF/services/org/apache/activemq/transport/logwriters
+// * directory. This parameter is most probably set in Connection or
+// * TransportConnector URIs.
+// */
+// // protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
+// /**
+// * Specifies if the TransportLogger will be manageable by JMX or not. Also,
+// * as long as there is at least 1 TransportLogger which is manageable, a
+// * TransportLoggerControl MBean will me created.
+// */
+// protected boolean dynamicManagement = false;
+// /**
+// * startLogging=true -> the TransportLogger object of the Transport stack
+// * will initially write messages to the log. startLogging=false -> the
+// * TransportLogger object of the Transport stack will initially NOT write
+// * messages to the log. This parameter only has an effect if trace == true.
+// * This parameter is most probably set in Connection or TransportConnector
+// * URIs.
+// */
+// protected boolean startLogging = true;
+// /**
+// * Specifies the port that will be used by the JMX server to manage the
+// * TransportLoggers. This should only be set in an URI by a client (producer
+// * or consumer) since a broker will already create a JMX server. It is
+// * useful for people who test a broker and clients in the same machine and
+// * want to control both via JMX; a different port will be needed.
+// */
+// protected int jmxPort = 1099;
+// protected int minmumWireFormatVersion;
+// protected SocketFactory socketFactory;
+// protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>();
+//
+// private Map<String, Object> socketOptions;
+// private Boolean keepAlive;
+// private Boolean tcpNoDelay;
+// private Thread runnerThread;
+//
+// protected boolean useActivityMonitor;
+//
+// /**
+// * Connect to a remote Node - e.g. a Broker
+// *
+// * @param wireFormat
+// * @param socketFactory
+// * @param remoteLocation
+// * @param localLocation
+// * - e.g. local InetAddress and local port
+// * @throws IOException
+// * @throws UnknownHostException
+// */
+// public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
+// this.wireFormat = wireFormat;
+// this.socketFactory = socketFactory;
+// try {
+// this.socket = socketFactory.createSocket();
+// } catch (SocketException e) {
+// this.socket = null;
+// }
+// this.remoteLocation = remoteLocation;
+// this.localLocation = localLocation;
+// setDaemon(false);
+// }
+//
+// /**
+// * Initialize from a server Socket
+// *
+// * @param wireFormat
+// * @param socket
+// * @throws IOException
+// */
+// public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
+// this.wireFormat = wireFormat;
+// this.socket = socket;
+// this.remoteLocation = null;
+// this.localLocation = null;
+// setDaemon(true);
+// }
+//
+// LinkedBlockingQueue<Object> outbound = new LinkedBlockingQueue<Object>();
+// private Thread onewayThread;
+//
+// /**
+// * A one way asynchronous send
+// */
+// public void oneway(Object command) throws IOException {
+// checkStarted();
+// try {
+// if (ASYNC_WRITE) {
+// outbound.put(command);
+// } else {
+// wireFormat.marshal(command, dataOut);
+// dataOut.flush();
+// }
+// } catch (InterruptedException e) {
+// throw new InterruptedIOException();
+// }
+// }
+//
+// protected void sendOneways() {
+// try {
+// LOG.debug("Started oneway thead");
+// while (!isStopped()) {
+// Object command = outbound.poll(500, TimeUnit.MILLISECONDS);
+// if (command != null) {
+// try {
+// // int count=0;
+// while (command != null) {
+// wireFormat.marshal(command, dataOut);
+// // count++;
+// command = outbound.poll();
+// }
+// // System.out.println(count);
+// dataOut.flush();
+// } catch (IOException e) {
+// getTransportListener().onException(e);
+// }
+// }
+// }
+// } catch (InterruptedException e) {
+// }
+// }
+//
+// /**
+// * @return pretty print of 'this'
+// */
+// public String toString() {
+// return "tcp://" + socket.getInetAddress() + ":" + socket.getPort();
+// }
+//
+// /**
+// * reads packets from a Socket
+// */
+// public void run() {
+// LOG.trace("TCP consumer thread for " + this + " starting");
+// this.runnerThread = Thread.currentThread();
+// try {
+// while (!isStopped()) {
+// doRun();
+// }
+// } catch (IOException e) {
+// stoppedLatch.get().countDown();
+// onException(e);
+// } catch (Throwable e) {
+// stoppedLatch.get().countDown();
+// IOException ioe = new IOException("Unexpected error occured");
+// ioe.initCause(e);
+// onException(ioe);
+// } finally {
+// stoppedLatch.get().countDown();
+// }
+// }
+//
+// protected void doRun() throws IOException {
+// try {
+// Object command = readCommand();
+// doConsume(command);
+// } catch (SocketTimeoutException e) {
+// } catch (InterruptedIOException e) {
+// }
+// }
+//
+// protected Object readCommand() throws IOException {
+// return wireFormat.unmarshal(dataIn);
+// }
+//
+// // Properties
+// // -------------------------------------------------------------------------
+//
+// public boolean isTrace() {
+// return trace;
+// }
+//
+// public void setTrace(boolean trace) {
+// this.trace = trace;
+// }
+//
+// void setUseInactivityMonitor(boolean val) {
+// useActivityMonitor = val;
+// }
+//
+// public boolean isUseInactivityMonitor() {
+// return useActivityMonitor;
+// }
+//
+// // public String getLogWriterName() {
+// // return logWriterName;
+// // }
+// //
+// // public void setLogWriterName(String logFormat) {
+// // this.logWriterName = logFormat;
+// // }
+//
+// public boolean isDynamicManagement() {
+// return dynamicManagement;
+// }
+//
+// public void setDynamicManagement(boolean useJmx) {
+// this.dynamicManagement = useJmx;
+// }
+//
+// public boolean isStartLogging() {
+// return startLogging;
+// }
+//
+// public void setStartLogging(boolean startLogging) {
+// this.startLogging = startLogging;
+// }
+//
+// public int getJmxPort() {
+// return jmxPort;
+// }
+//
+// public void setJmxPort(int jmxPort) {
+// this.jmxPort = jmxPort;
+// }
+//
+// public int getMinmumWireFormatVersion() {
+// return minmumWireFormatVersion;
+// }
+//
+// public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
+// this.minmumWireFormatVersion = minmumWireFormatVersion;
+// }
+//
+ public boolean isUseLocalHost() {
+ return useLocalHost;
+ }
+
+ /**
+ * Sets whether 'localhost' or the actual local host name should be used to
+ * make local connections. On some operating systems such as Macs its not
+ * possible to connect as the local host name so localhost is better.
+ */
+ public void setUseLocalHost(boolean useLocalHost) {
+ this.useLocalHost = useLocalHost;
}
+
+// public int getSocketBufferSize() {
+// return socketBufferSize;
+// }
+//
+// /**
+// * Sets the buffer size to use on the socket
+// */
+// public void setSocketBufferSize(int socketBufferSize) {
+// this.socketBufferSize = socketBufferSize;
+// }
+//
+// public int getSoTimeout() {
+// return soTimeout;
+// }
+//
+// /**
+// * Sets the socket timeout
+// */
+// public void setSoTimeout(int soTimeout) {
+// this.soTimeout = soTimeout;
+// }
+//
+// public int getConnectionTimeout() {
+// return connectionTimeout;
+// }
+//
+// /**
+// * Sets the timeout used to connect to the socket
+// */
+// public void setConnectionTimeout(int connectionTimeout) {
+// this.connectionTimeout = connectionTimeout;
+// }
+//
+// public Boolean getKeepAlive() {
+// return keepAlive;
+// }
+//
+// /**
+// * Enable/disable TCP KEEP_ALIVE mode
+// */
+// public void setKeepAlive(Boolean keepAlive) {
+// this.keepAlive = keepAlive;
+// }
+//
+// public Boolean getTcpNoDelay() {
+// return tcpNoDelay;
+// }
+//
+// /**
+// * Enable/disable the TCP_NODELAY option on the socket
+// */
+// public void setTcpNoDelay(Boolean tcpNoDelay) {
+// this.tcpNoDelay = tcpNoDelay;
+// }
+//
+// /**
+// * @return the ioBufferSize
+// */
+// public int getIoBufferSize() {
+// return this.ioBufferSize;
+// }
+//
+// /**
+// * @param ioBufferSize
+// * the ioBufferSize to set
+// */
+// public void setIoBufferSize(int ioBufferSize) {
+// this.ioBufferSize = ioBufferSize;
+// }
+//
+// /**
+// * @return the closeAsync
+// */
+// public boolean isCloseAsync() {
+// return closeAsync;
+// }
+//
+// /**
+// * @param closeAsync
+// * the closeAsync to set
+// */
+// public void setCloseAsync(boolean closeAsync) {
+// this.closeAsync = closeAsync;
+// }
+//
+// // Implementation methods
+// // -------------------------------------------------------------------------
+// protected String resolveHostName(String host) throws UnknownHostException {
+// String localName = InetAddress.getLocalHost().getHostName();
+// if (localName != null && isUseLocalHost()) {
+// if (localName.equals(host)) {
+// return "localhost";
+// }
+// }
+// return host;
+// }
+//
+// /**
+// * Configures the socket for use
+// *
+// * @param sock
+// * @throws SocketException
+// */
+// protected void initialiseSocket(Socket sock) throws SocketException {
+// if (socketOptions != null) {
+// IntrospectionSupport.setProperties(socket, socketOptions);
+// }
+//
+// try {
+// sock.setReceiveBufferSize(socketBufferSize);
+// sock.setSendBufferSize(socketBufferSize);
+// } catch (SocketException se) {
+// LOG.warn("Cannot set socket buffer size = " + socketBufferSize);
+// LOG.debug("Cannot set socket buffer size. Reason: " + se, se);
+// }
+// sock.setSoTimeout(soTimeout);
+//
+// if (keepAlive != null) {
+// sock.setKeepAlive(keepAlive.booleanValue());
+// }
+// if (tcpNoDelay != null) {
+// sock.setTcpNoDelay(tcpNoDelay.booleanValue());
+// }
+// }
+//
+// protected void doStart() throws Exception {
+// connect();
+// if (ASYNC_WRITE) {
+// onewayThread = new Thread() {
+// @Override
+// public void run() {
+// sendOneways();
+// }
+// };
+// onewayThread.start();
+// }
+//
+// stoppedLatch.set(new CountDownLatch(1));
+// super.doStart();
+// }
+//
+// protected void connect() throws Exception {
+//
+// if (socket == null && socketFactory == null) {
+// throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
+// }
+//
+// InetSocketAddress localAddress = null;
+// InetSocketAddress remoteAddress = null;
+//
+// if (localLocation != null) {
+// localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
+// }
+//
+// if (remoteLocation != null) {
+// String host = resolveHostName(remoteLocation.getHost());
+// remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
+// }
+//
+// if (socket != null) {
+//
+// if (localAddress != null) {
+// socket.bind(localAddress);
+// }
+//
+// // If it's a server accepted socket.. we don't need to connect it
+// // to a remote address.
+// if (remoteAddress != null) {
+// if (connectionTimeout >= 0) {
+// socket.connect(remoteAddress, connectionTimeout);
+// } else {
+// socket.connect(remoteAddress);
+// }
+// }
+//
+// } else {
+// // For SSL sockets.. you can't create an unconnected socket :(
+// // This means the timout option are not supported either.
+// if (localAddress != null) {
+// socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), localAddress.getAddress(), localAddress.getPort());
+// } else {
+// socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
+// }
+// }
+//
+// initialiseSocket(socket);
+// initializeStreams();
+// }
+//
+// protected void doStop(ServiceStopper stopper) throws Exception {
+// if (LOG.isDebugEnabled()) {
+// LOG.debug("Stopping transport " + this);
+// }
+//
+// // Closing the streams flush the sockets before closing.. if the socket
+// // is hung.. then this hangs the close.
+// // closeStreams();
+// if (socket != null) {
+// if (closeAsync) {
+// // closing the socket can hang also
+// final CountDownLatch latch = new CountDownLatch(1);
+//
+// SOCKET_CLOSE.execute(new Runnable() {
+//
+// public void run() {
+// try {
+// socket.close();
+// } catch (IOException e) {
+// LOG.debug("Caught exception closing socket", e);
+// } finally {
+// latch.countDown();
+// }
+// }
+//
+// });
+// latch.await(1, TimeUnit.SECONDS);
+// } else {
+// try {
+// socket.close();
+// } catch (IOException e) {
+// LOG.debug("Caught exception closing socket", e);
+// }
+//
+// }
+// if (ASYNC_WRITE) {
+// onewayThread.join();
+// }
+// }
+// }
+//
+// /**
+// * Override so that stop() blocks until the run thread is no longer running.
+// */
+// @Override
+// public void stop() throws Exception {
+// super.stop();
+// CountDownLatch countDownLatch = stoppedLatch.get();
+// if (countDownLatch != null && Thread.currentThread() != this.runnerThread) {
+// countDownLatch.await(1, TimeUnit.SECONDS);
+// }
+// }
+//
+// protected void initializeStreams() throws Exception {
+// TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
+// this.dataIn = new DataInputStream(buffIn);
+// buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
+// this.dataOut = new DataOutputStream(buffOut);
+// }
+//
+// protected void closeStreams() throws IOException {
+// if (dataOut != null) {
+// dataOut.close();
+// }
+// if (dataIn != null) {
+// dataIn.close();
+// }
+// }
+//
+// public void setSocketOptions(Map<String, Object> socketOptions) {
+// this.socketOptions = new HashMap<String, Object>(socketOptions);
+// }
+//
+// public String getRemoteAddress() {
+// if (socket != null) {
+// return "" + socket.getRemoteSocketAddress();
+// }
+// return null;
+// }
+//
+// @Override
+// public <T> T narrow(Class<T> target) {
+// if (target == Socket.class) {
+// return target.cast(socket);
+// } else if (target == TcpBufferedOutputStream.class) {
+// return target.cast(buffOut);
+// }
+// return super.narrow(target);
+// }
+//
+// static {
+// SOCKET_CLOSE = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
+// public Thread newThread(Runnable runnable) {
+// Thread thread = new Thread(runnable, "TcpSocketClose: " + runnable);
+// thread.setPriority(Thread.MAX_PRIORITY);
+// thread.setDaemon(true);
+// return thread;
+// }
+// });
+// }
+//
+// public WireFormat getWireformat()
+// {
+// return wireFormat;
+// }
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java Wed Jul 7 03:24:02 2010
@@ -29,6 +29,7 @@ import javax.net.SocketFactory;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
//import org.apache.activemq.transport.TransportLoggerFactory;
+import org.apache.activemq.transport.TransportFactorySupport;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
@@ -37,87 +38,50 @@ import org.apache.activemq.wireformat.Wi
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import static org.apache.activemq.transport.TransportFactorySupport.configure;
+import static org.apache.activemq.transport.TransportFactorySupport.verify;
+
/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
* @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
*/
-public class TcpTransportFactory extends TransportFactory {
+public class TcpTransportFactory implements TransportFactory.TransportFactorySPI {
private static final Log LOG = LogFactory.getLog(TcpTransportFactory.class);
- public TransportServer doBind(final URI location) throws IOException {
- try {
- Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
-
- ServerSocketFactory serverSocketFactory = createServerSocketFactory();
- TcpTransportServer server = createTcpTransportServer(location, serverSocketFactory);
- server.setWireFormatFactory(createWireFormatFactory(options));
- IntrospectionSupport.setProperties(server, options);
- Map<String, Object> transportOptions = IntrospectionSupport.extractProperties(options, "transport.");
- server.setTransportOption(transportOptions);
- server.bind();
-
- return server;
- } catch (URISyntaxException e) {
- throw IOExceptionSupport.create(e);
- }
+ public TransportServer bind(URI location) throws Exception {
+ Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
+ TcpTransportServer server = createTcpTransportServer(location);
+ server.setWireFormatFactory(TransportFactorySupport.createWireFormatFactory(options));
+ IntrospectionSupport.setProperties(server, options);
+ Map<String, Object> transportOptions = IntrospectionSupport.extractProperties(options, "transport.");
+ server.setTransportOption(transportOptions);
+ return server;
}
/**
* Allows subclasses of TcpTransportFactory to create custom instances of
* TcpTransportServer.
- *
- * @param location
- * @param serverSocketFactory
- * @return
- * @throws IOException
- * @throws URISyntaxException
*/
- protected TcpTransportServer createTcpTransportServer(final URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
- return new TcpTransportServer(this, location, serverSocketFactory);
+ protected TcpTransportServer createTcpTransportServer(final URI location) throws IOException, URISyntaxException {
+ return new TcpTransportServer(location);
}
- public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
- TcpTransport tcpTransport = (TcpTransport)transport.narrow(TcpTransport.class);
- IntrospectionSupport.setProperties(tcpTransport, options);
+ public Transport connect(URI location) throws Exception {
+ Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
+ URI localLocation = getLocalLocation(location);
- Map<String, Object> socketOptions = IntrospectionSupport.extractProperties(options, "socket.");
- tcpTransport.setSocketOptions(socketOptions);
-
- if (tcpTransport.isTrace()) {
- throw new UnsupportedOperationException("Trace not implemented");
-// try {
-// transport = TransportLoggerFactory.getInstance().createTransportLogger(transport, tcpTransport.getLogWriterName(),
-// tcpTransport.isDynamicManagement(), tcpTransport.isStartLogging(), tcpTransport.getJmxPort());
-// } catch (Throwable e) {
-// LOG.error("Could not create TransportLogger object for: " + tcpTransport.getLogWriterName() + ", reason: " + e, e);
-// }
- }
-
- boolean useInactivityMonitor = "true".equals(getOption(options, "useInactivityMonitor", "true"));
- tcpTransport.setUseInactivityMonitor(useInactivityMonitor && isUseInactivityMonitor(transport));
-
-
- transport = format.createTransportFilters(transport, options);
-
- return transport;
- }
+ TcpTransport transport = new TcpTransport();
+ transport.connecting(location, localLocation);
- protected String getOption(Map options, String key, String def) {
- String rc = (String) options.remove(key);
- if( rc == null ) {
- rc = def;
- }
- return rc;
- }
+ Map<String, Object> socketOptions = IntrospectionSupport.extractProperties(options, "socket.");
+ transport.setSocketOptions(socketOptions);
- /**
- * Returns true if the inactivity monitor should be used on the transport
- */
- protected boolean isUseInactivityMonitor(Transport transport) {
- return true;
+ configure(transport, options);
+ return verify(transport, options);
}
- protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
+ private URI getLocalLocation(URI location) {
URI localLocation = null;
String path = location.getPath();
// see if the path is a local URI location
@@ -131,31 +95,15 @@ public class TcpTransportFactory extends
LOG.warn("path isn't a valid local location for TcpTransport to use", e);
}
}
- SocketFactory socketFactory = createSocketFactory();
- return createTcpTransport(wf, socketFactory, location, localLocation);
+ return localLocation;
}
- /**
- * Allows subclasses of TcpTransportFactory to provide a create custom
- * TcpTransport intances.
- *
- * @param location
- * @param wf
- * @param socketFactory
- * @param localLocation
- * @return
- * @throws UnknownHostException
- * @throws IOException
- */
- protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
- return new TcpTransport(wf, socketFactory, location, localLocation);
- }
-
- protected ServerSocketFactory createServerSocketFactory() throws IOException {
- return ServerSocketFactory.getDefault();
+ protected String getOption(Map options, String key, String def) {
+ String rc = (String) options.remove(key);
+ if( rc == null ) {
+ rc = def;
+ }
+ return rc;
}
- protected SocketFactory createSocketFactory() throws IOException {
- return SocketFactory.getDefault();
- }
}