You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:45:42 UTC
svn commit: r961079 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/java/org/apache/activemq/apollo/broker/
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-stomp/src/main/scala/org/apache/activemq/apoll...
Author: chirino
Date: Wed Jul 7 03:45:42 2010
New Revision: 961079
URL: http://svn.apache.org/viewvc?rev=961079&view=rev
Log:
cleanup connection shutdown logic
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala?rev=961079&r1=961078&r2=961079&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala Wed Jul 7 03:45:42 2010
@@ -52,6 +52,9 @@ class Domain {
}
+object Router extends Log {
+
+}
/**
* Provides a non-blocking concurrent producer to consumer
@@ -65,8 +68,11 @@ class Domain {
* to the destination.
*
*/
-class Router(var queue:DispatchQueue) {
-
+class Router(var queue:DispatchQueue) extends DispatchLogging {
+
+ override protected def log = Router
+ protected def dispatchQueue:DispatchQueue = queue
+
trait DestinationNode {
var targets = List[DeliveryConsumer]()
var routes = List[DeliveryProducerRoute]()
@@ -189,8 +195,10 @@ trait Route extends Retained {
}
-class DeliveryProducerRoute(val destination:Destination, val queue:DispatchQueue, val producer:DeliveryProducer) extends BaseRetained with Route {
+class DeliveryProducerRoute(val destination:Destination, val queue:DispatchQueue, val producer:DeliveryProducer) extends BaseRetained with Route with DispatchLogging {
+ override protected def log = Router
+ protected def dispatchQueue:DispatchQueue = queue
// Retain the queue while we are retained.
queue.retain
@@ -211,7 +219,7 @@ class DeliveryProducerRoute(val destinat
private def internal_bind(values:List[DeliveryConsumer]) = {
values.foreach{ x=>
- println("producer route attaching to conusmer.")
+ debug("producer route attaching to conusmer.")
targets = x.open_session(queue) :: targets
}
}
@@ -220,6 +228,7 @@ class DeliveryProducerRoute(val destinat
this.targets = this.targets.filterNot { x=>
val rc = targets.contains(x.consumer)
if( rc ) {
+ debug("producer route detaching from conusmer.")
x.close
}
rc
@@ -228,6 +237,7 @@ class DeliveryProducerRoute(val destinat
def disconnected() = ^ {
this.targets.foreach { x=>
+ debug("producer route detaching from conusmer.")
x.close
x.consumer.release
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961079&r1=961078&r2=961079&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul 7 03:45:42 2010
@@ -44,7 +44,7 @@ abstract class Connection() extends Tran
dispatchQueue.setTargetQueue(Dispatch.getRandomThreadQueue)
var name = "connection"
- var stopping = false;
+ var stopped = false;
var transport:Transport = null
@@ -55,13 +55,13 @@ abstract class Connection() extends Tran
}
def stop() = {
- stopping=true
+ stopped=true
transport.stop()
dispatchQueue.release
}
def onTransportFailure(error:IOException) = {
- if (!stopping) {
+ if (!stopped) {
onFailure(error);
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961079&r1=961078&r2=961079&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul 7 03:45:42 2010
@@ -111,7 +111,7 @@ class StompProtocolHandler extends Proto
override def onTransportDisconnected() = {
if( !closed ) {
- info("stop")
+ info("cleaning up resources")
closed=true;
if( producerRoute!=null ) {
host.router.disconnect(producerRoute)
@@ -250,17 +250,21 @@ class StompProtocolHandler extends Proto
}
private def die(msg:String) = {
- info("Shutting connection down due to: "+msg)
- connection.transport.suspendRead
- connection.transport.oneway(StompFrame(Responses.ERROR, Nil, ascii(msg)), null)
- ^ {
- connection.stop()
- } ->: queue
+ if( !connection.stopped ) {
+ info("Shutting connection down due to: "+msg)
+ connection.transport.suspendRead
+ connection.transport.oneway(StompFrame(Responses.ERROR, Nil, ascii(msg)), null)
+ ^ {
+ connection.stop()
+ } ->: queue
+ }
}
override def onTransportFailure(error: IOException) = {
- info(error, "Shutting connection down due to: %s", error)
- super.onTransportFailure(error);
+ if( !connection.stopped ) {
+ info(error, "Shutting connection down due to: %s", error)
+ super.onTransportFailure(error);
+ }
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=961079&r1=961078&r2=961079&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala Wed Jul 7 03:45:42 2010
@@ -84,7 +84,7 @@ class StompRemoteConsumer extends Remote
transport.suspendRead
dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, ^ {
consumerRate.increment();
- if (!stopping) {
+ if (!stopped) {
transport.resumeRead
}
})
@@ -116,7 +116,7 @@ class StompRemoteProducer extends Remote
delivery.setDisposer(^{
rate.increment();
val task = ^ {
- if( !stopping ) {
+ if( !stopped ) {
send_next
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961079&r1=961078&r2=961079&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul 7 03:45:42 2010
@@ -18,8 +18,6 @@ package org.apache.activemq.transport.tc
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
-import org.apache.activemq.util.buffer.Buffer;
-import org.apache.activemq.util.buffer.ByteArrayOutputStream;
import org.apache.activemq.util.buffer.DataByteArrayOutputStream;
import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.hawtdispatch.Dispatch;
@@ -33,7 +31,6 @@ import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Map;
@@ -130,7 +127,7 @@ public class TcpTransport implements Tra
throw new IllegalArgumentException("listener is not set");
}
if (transportState != CREATED) {
- throw new IllegalStateException("can only be started from the created stae");
+ throw new IllegalStateException("start can only be used from the created state");
}
transportState = RUNNING;
@@ -166,7 +163,7 @@ public class TcpTransport implements Tra
connectSource.release();
fireConnected();
} catch (IOException e) {
- listener.onTransportFailure(e);
+ onTransportFailure(e);
}
}
}
@@ -202,10 +199,16 @@ public class TcpTransport implements Tra
try {
drainInbound();
} catch (IOException e) {
- listener.onTransportFailure(e);
+ onTransportFailure(e);
}
}
});
+ readSource.setCancelHandler(new Runnable() {
+ public void run() {
+ readSource.release();
+ releaseResources();
+ }
+ });
writeSource = Dispatch.createSource(channel, SelectionKey.OP_WRITE, dispatchQueue);
writeSource.setEventHandler(new Runnable() {
@@ -219,6 +222,12 @@ public class TcpTransport implements Tra
}
}
});
+ writeSource.setCancelHandler(new Runnable() {
+ public void run() {
+ writeSource.release();
+ releaseResources();
+ }
+ });
remoteAddress = channel.socket().getRemoteSocketAddress().toString();
listener.onTransportConnected();
@@ -226,16 +235,41 @@ public class TcpTransport implements Tra
public void stop() throws Exception {
- if (readSource != null) {
- readSource.release();
- readSource = null;
- }
- if (writeSource != null) {
- writeSource.release();
- writeSource = null;
+ if (transportState != RUNNING) {
+ throw new IllegalStateException("stop can only be used from the started state");
}
- setDispatchQueue(null);
transportState = DISPOSED;
+ readSource.cancel();
+ writeSource.cancel();
+ }
+
+ private void releaseResources() {
+ if( writeSource.isReleased() && writeSource.isReleased() ) {
+ try {
+ channel.close();
+ } catch (IOException ignore) {
+ }
+ listener.onTransportDisconnected();
+ OneWay oneWay = outbound.poll();
+ while (oneWay != null) {
+ if (oneWay.retained != null) {
+ oneWay.retained.release();
+ }
+ }
+ setDispatchQueue(null);
+ next_outbound_buffer = null;
+ outbound_buffer = null;
+ unmarshalSession = null;
+ }
+ }
+
+ public void onTransportFailure(IOException error) {
+ if( socketState == CONNECTED ) {
+ socketState = DISCONNECTED;
+ listener.onTransportFailure(error);
+ readSource.cancel();
+ writeSource.cancel();
+ }
}
@@ -253,7 +287,8 @@ public class TcpTransport implements Tra
throw new IOException("Not running.");
}
} catch (IOException e) {
- listener.onTransportFailure(e);
+ onTransportFailure(e);
+ return;
}
boolean wasEmpty = next_outbound_buffer.size()==0;
@@ -265,7 +300,8 @@ public class TcpTransport implements Tra
try {
wireformat.marshal(command, next_outbound_buffer);
} catch (IOException e) {
- listener.onTransportFailure(e);
+ onTransportFailure(e);
+ return;
}
if ( outbound_buffer.remaining()==0 ) {
writeSource.resume();
@@ -298,11 +334,7 @@ public class TcpTransport implements Tra
// marshall all the available frames..
OneWay oneWay = outbound.poll();
while (oneWay != null) {
- try {
- wireformat.marshal(oneWay.command, next_outbound_buffer);
- } catch (IOException e) {
- listener.onTransportFailure(e);
- }
+ wireformat.marshal(oneWay.command, next_outbound_buffer);
if (oneWay.retained != null) {
oneWay.retained.release();
}
@@ -322,13 +354,13 @@ public class TcpTransport implements Tra
}
} catch (IOException e) {
- listener.onTransportFailure(e);
+ onTransportFailure(e);
+ return true;
}
return outbound.isEmpty() && outbound_buffer == null;
}
-
private void drainInbound() throws IOException {
if (transportState == DISPOSED || readSource.isSuspended()) {
return;
@@ -436,259 +468,6 @@ public class TcpTransport implements Tra
this.socketOptions = socketOptions;
}
-// private static final Log LOG = LogFactory.getLog(TcpTransport.class);
-// private static final ThreadPoolExecutor SOCKET_CLOSE;
-// protected final URI remoteLocation;
-// protected final URI localLocation;
-// protected final WireFormat wireFormat;
-//
-// protected int connectionTimeout = 30000;
-// protected int soTimeout;
-// protected int socketBufferSize = 64 * 1024;
-// protected int ioBufferSize = 8 * 1024;
-// protected boolean closeAsync = true;
-// protected Socket socket;
-// protected DataOutputStream dataOut;
-// protected DataInputStream dataIn;
-// protected TcpBufferedOutputStream buffOut = null;
-//
-// private static final boolean ASYNC_WRITE = false;
-// /**
-// * trace=true -> the Transport stack where this TcpTransport object will be,
-// * will have a TransportLogger layer trace=false -> the Transport stack
-// * where this TcpTransport object will be, will NOT have a TransportLogger
-// * layer, and therefore will never be able to print logging messages. This
-// * parameter is most probably set in Connection or TransportConnector URIs.
-// */
-// protected boolean trace = false;
-// /**
-// * Name of the LogWriter implementation to use. Names are mapped to classes
-// * in the
-// * resources/META-INF/services/org/apache/activemq/transport/logwriters
-// * directory. This parameter is most probably set in Connection or
-// * TransportConnector URIs.
-// */
-// // protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
-// /**
-// * Specifies if the TransportLogger will be manageable by JMX or not. Also,
-// * as long as there is at least 1 TransportLogger which is manageable, a
-// * TransportLoggerControl MBean will me created.
-// */
-// protected boolean dynamicManagement = false;
-// /**
-// * startLogging=true -> the TransportLogger object of the Transport stack
-// * will initially write messages to the log. startLogging=false -> the
-// * TransportLogger object of the Transport stack will initially NOT write
-// * messages to the log. This parameter only has an effect if trace == true.
-// * This parameter is most probably set in Connection or TransportConnector
-// * URIs.
-// */
-// protected boolean startLogging = true;
-// /**
-// * Specifies the port that will be used by the JMX server to manage the
-// * TransportLoggers. This should only be set in an URI by a client (producer
-// * or consumer) since a broker will already create a JMX server. It is
-// * useful for people who test a broker and clients in the same machine and
-// * want to control both via JMX; a different port will be needed.
-// */
-// protected int jmxPort = 1099;
-// protected int minmumWireFormatVersion;
-// protected SocketFactory socketFactory;
-// protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>();
-//
-// private Map<String, Object> socketOptions;
-// private Boolean keepAlive;
-// private Boolean tcpNoDelay;
-// private Thread runnerThread;
-//
-// protected boolean useActivityMonitor;
-//
-// /**
-// * Connect to a remote Node - e.g. a Broker
-// *
-// * @param wireFormat
-// * @param socketFactory
-// * @param remoteLocation
-// * @param localLocation
-// * - e.g. local InetAddress and local port
-// * @throws IOException
-// * @throws UnknownHostException
-// */
-// public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
-// this.wireFormat = wireFormat;
-// this.socketFactory = socketFactory;
-// try {
-// this.socket = socketFactory.createSocket();
-// } catch (SocketException e) {
-// this.socket = null;
-// }
-// this.remoteLocation = remoteLocation;
-// this.localLocation = localLocation;
-// setDaemon(false);
-// }
-//
-// /**
-// * Initialize from a server Socket
-// *
-// * @param wireFormat
-// * @param socket
-// * @throws IOException
-// */
-// public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
-// this.wireFormat = wireFormat;
-// this.socket = socket;
-// this.remoteLocation = null;
-// this.localLocation = null;
-// setDaemon(true);
-// }
-//
-// LinkedBlockingQueue<Object> outbound = new LinkedBlockingQueue<Object>();
-// private Thread onewayThread;
-//
-// /**
-// * A one way asynchronous send
-// */
-// public void oneway(Object command) throws IOException {
-// checkStarted();
-// try {
-// if (ASYNC_WRITE) {
-// outbound.put(command);
-// } else {
-// wireFormat.marshal(command, dataOut);
-// dataOut.flush();
-// }
-// } catch (InterruptedException e) {
-// throw new InterruptedIOException();
-// }
-// }
-//
-// protected void sendOneways() {
-// try {
-// LOG.debug("Started oneway thead");
-// while (!isStopped()) {
-// Object command = outbound.poll(500, TimeUnit.MILLISECONDS);
-// if (command != null) {
-// try {
-// // int count=0;
-// while (command != null) {
-// wireFormat.marshal(command, dataOut);
-// // count++;
-// command = outbound.poll();
-// }
-// // System.out.println(count);
-// dataOut.flush();
-// } catch (IOException e) {
-// getTransportListener().onException(e);
-// }
-// }
-// }
-// } catch (InterruptedException e) {
-// }
-// }
-//
-// /**
-// * @return pretty print of 'this'
-// */
-// public String toString() {
-// return "tcp://" + socket.getInetAddress() + ":" + socket.getPort();
-// }
-//
-// /**
-// * reads packets from a Socket
-// */
-// public void run() {
-// LOG.trace("TCP consumer thread for " + this + " starting");
-// this.runnerThread = Thread.currentThread();
-// try {
-// while (!isStopped()) {
-// doRun();
-// }
-// } catch (IOException e) {
-// stoppedLatch.get().countDown();
-// onException(e);
-// } catch (Throwable e) {
-// stoppedLatch.get().countDown();
-// IOException ioe = new IOException("Unexpected error occured");
-// ioe.initCause(e);
-// onException(ioe);
-// } finally {
-// stoppedLatch.get().countDown();
-// }
-// }
-//
-// protected void doRun() throws IOException {
-// try {
-// Object command = readCommand();
-// doConsume(command);
-// } catch (SocketTimeoutException e) {
-// } catch (InterruptedIOException e) {
-// }
-// }
-//
-// protected Object readCommand() throws IOException {
-// return wireFormat.unmarshal(dataIn);
-// }
-//
-// // Properties
-// // -------------------------------------------------------------------------
-//
-// public boolean isTrace() {
-// return trace;
-// }
-//
-// public void setTrace(boolean trace) {
-// this.trace = trace;
-// }
-//
-// void setUseInactivityMonitor(boolean val) {
-// useActivityMonitor = val;
-// }
-//
-// public boolean isUseInactivityMonitor() {
-// return useActivityMonitor;
-// }
-//
-// // public String getLogWriterName() {
-// // return logWriterName;
-// // }
-// //
-// // public void setLogWriterName(String logFormat) {
-// // this.logWriterName = logFormat;
-// // }
-//
-// public boolean isDynamicManagement() {
-// return dynamicManagement;
-// }
-//
-// public void setDynamicManagement(boolean useJmx) {
-// this.dynamicManagement = useJmx;
-// }
-//
-// public boolean isStartLogging() {
-// return startLogging;
-// }
-//
-// public void setStartLogging(boolean startLogging) {
-// this.startLogging = startLogging;
-// }
-//
-// public int getJmxPort() {
-// return jmxPort;
-// }
-//
-// public void setJmxPort(int jmxPort) {
-// this.jmxPort = jmxPort;
-// }
-//
-// public int getMinmumWireFormatVersion() {
-// return minmumWireFormatVersion;
-// }
-//
-// public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
-// this.minmumWireFormatVersion = minmumWireFormatVersion;
-// }
-//
-
public boolean isUseLocalHost() {
return useLocalHost;
}
@@ -702,298 +481,4 @@ public class TcpTransport implements Tra
this.useLocalHost = useLocalHost;
}
-// public int getSocketBufferSize() {
-// return socketBufferSize;
-// }
-//
-// /**
-// * Sets the buffer size to use on the socket
-// */
-// public void setSocketBufferSize(int socketBufferSize) {
-// this.socketBufferSize = socketBufferSize;
-// }
-//
-// public int getSoTimeout() {
-// return soTimeout;
-// }
-//
-// /**
-// * Sets the socket timeout
-// */
-// public void setSoTimeout(int soTimeout) {
-// this.soTimeout = soTimeout;
-// }
-//
-// public int getConnectionTimeout() {
-// return connectionTimeout;
-// }
-//
-// /**
-// * Sets the timeout used to connect to the socket
-// */
-// public void setConnectionTimeout(int connectionTimeout) {
-// this.connectionTimeout = connectionTimeout;
-// }
-//
-// public Boolean getKeepAlive() {
-// return keepAlive;
-// }
-//
-// /**
-// * Enable/disable TCP KEEP_ALIVE mode
-// */
-// public void setKeepAlive(Boolean keepAlive) {
-// this.keepAlive = keepAlive;
-// }
-//
-// public Boolean getTcpNoDelay() {
-// return tcpNoDelay;
-// }
-//
-// /**
-// * Enable/disable the TCP_NODELAY option on the socket
-// */
-// public void setTcpNoDelay(Boolean tcpNoDelay) {
-// this.tcpNoDelay = tcpNoDelay;
-// }
-//
-// /**
-// * @return the ioBufferSize
-// */
-// public int getIoBufferSize() {
-// return this.ioBufferSize;
-// }
-//
-// /**
-// * @param ioBufferSize
-// * the ioBufferSize to set
-// */
-// public void setIoBufferSize(int ioBufferSize) {
-// this.ioBufferSize = ioBufferSize;
-// }
-//
-// /**
-// * @return the closeAsync
-// */
-// public boolean isCloseAsync() {
-// return closeAsync;
-// }
-//
-// /**
-// * @param closeAsync
-// * the closeAsync to set
-// */
-// public void setCloseAsync(boolean closeAsync) {
-// this.closeAsync = closeAsync;
-// }
-//
-// // Implementation methods
-// // -------------------------------------------------------------------------
-// protected String resolveHostName(String host) throws UnknownHostException {
-// String localName = InetAddress.getLocalHost().getHostName();
-// if (localName != null && isUseLocalHost()) {
-// if (localName.equals(host)) {
-// return "localhost";
-// }
-// }
-// return host;
-// }
-//
-// /**
-// * Configures the socket for use
-// *
-// * @param sock
-// * @throws SocketException
-// */
-// protected void initialiseSocket(Socket sock) throws SocketException {
-// if (socketOptions != null) {
-// IntrospectionSupport.setProperties(socket, socketOptions);
-// }
-//
-// try {
-// sock.setReceiveBufferSize(socketBufferSize);
-// sock.setSendBufferSize(socketBufferSize);
-// } catch (SocketException se) {
-// LOG.warn("Cannot set socket buffer size = " + socketBufferSize);
-// LOG.debug("Cannot set socket buffer size. Reason: " + se, se);
-// }
-// sock.setSoTimeout(soTimeout);
-//
-// if (keepAlive != null) {
-// sock.setKeepAlive(keepAlive.booleanValue());
-// }
-// if (tcpNoDelay != null) {
-// sock.setTcpNoDelay(tcpNoDelay.booleanValue());
-// }
-// }
-//
-// protected void doStart() throws Exception {
-// connect();
-// if (ASYNC_WRITE) {
-// onewayThread = new Thread() {
-// @Override
-// public void run() {
-// sendOneways();
-// }
-// };
-// onewayThread.start();
-// }
-//
-// stoppedLatch.set(new CountDownLatch(1));
-// super.doStart();
-// }
-//
-// protected void connect() throws Exception {
-//
-// if (socket == null && socketFactory == null) {
-// throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
-// }
-//
-// InetSocketAddress localAddress = null;
-// InetSocketAddress remoteAddress = null;
-//
-// if (localLocation != null) {
-// localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
-// }
-//
-// if (remoteLocation != null) {
-// String host = resolveHostName(remoteLocation.getHost());
-// remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
-// }
-//
-// if (socket != null) {
-//
-// if (localAddress != null) {
-// socket.bind(localAddress);
-// }
-//
-// // If it's a server accepted socket.. we don't need to connect it
-// // to a remote address.
-// if (remoteAddress != null) {
-// if (connectionTimeout >= 0) {
-// socket.connect(remoteAddress, connectionTimeout);
-// } else {
-// socket.connect(remoteAddress);
-// }
-// }
-//
-// } else {
-// // For SSL sockets.. you can't create an unconnected socket :(
-// // This means the timout option are not supported either.
-// if (localAddress != null) {
-// socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), localAddress.getAddress(), localAddress.getPort());
-// } else {
-// socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
-// }
-// }
-//
-// initialiseSocket(socket);
-// initializeStreams();
-// }
-//
-// protected void doStop(ServiceStopper stopper) throws Exception {
-// if (LOG.isDebugEnabled()) {
-// LOG.debug("Stopping transport " + this);
-// }
-//
-// // Closing the streams flush the sockets before closing.. if the socket
-// // is hung.. then this hangs the close.
-// // closeStreams();
-// if (socket != null) {
-// if (closeAsync) {
-// // closing the socket can hang also
-// final CountDownLatch latch = new CountDownLatch(1);
-//
-// SOCKET_CLOSE.execute(new Runnable() {
-//
-// public void run() {
-// try {
-// socket.close();
-// } catch (IOException e) {
-// LOG.debug("Caught exception closing socket", e);
-// } finally {
-// latch.countDown();
-// }
-// }
-//
-// });
-// latch.await(1, TimeUnit.SECONDS);
-// } else {
-// try {
-// socket.close();
-// } catch (IOException e) {
-// LOG.debug("Caught exception closing socket", e);
-// }
-//
-// }
-// if (ASYNC_WRITE) {
-// onewayThread.join();
-// }
-// }
-// }
-//
-// /**
-// * Override so that stop() blocks until the run thread is no longer running.
-// */
-// @Override
-// public void stop() throws Exception {
-// super.stop();
-// CountDownLatch countDownLatch = stoppedLatch.get();
-// if (countDownLatch != null && Thread.currentThread() != this.runnerThread) {
-// countDownLatch.await(1, TimeUnit.SECONDS);
-// }
-// }
-//
-// protected void initializeStreams() throws Exception {
-// TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
-// this.dataIn = new DataInputStream(buffIn);
-// buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
-// this.dataOut = new DataOutputStream(buffOut);
-// }
-//
-// protected void closeStreams() throws IOException {
-// if (dataOut != null) {
-// dataOut.close();
-// }
-// if (dataIn != null) {
-// dataIn.close();
-// }
-// }
-//
-// public void setSocketOptions(Map<String, Object> socketOptions) {
-// this.socketOptions = new HashMap<String, Object>(socketOptions);
-// }
-//
-// public String getRemoteAddress() {
-// if (socket != null) {
-// return "" + socket.getRemoteSocketAddress();
-// }
-// return null;
-// }
-//
-// @Override
-// public <T> T narrow(Class<T> target) {
-// if (target == Socket.class) {
-// return target.cast(socket);
-// } else if (target == TcpBufferedOutputStream.class) {
-// return target.cast(buffOut);
-// }
-// return super.narrow(target);
-// }
-//
-// static {
-// SOCKET_CLOSE = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
-// public Thread newThread(Runnable runnable) {
-// Thread thread = new Thread(runnable, "TcpSocketClose: " + runnable);
-// thread.setPriority(Thread.MAX_PRIORITY);
-// thread.setDaemon(true);
-// return thread;
-// }
-// });
-// }
-//
-// public WireFormat getWireformat()
-// {
-// return wireFormat;
-// }
}