You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:45:42 UTC

svn commit: r961079 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-stomp/src/main/scala/org/apache/activemq/apoll...

Author: chirino
Date: Wed Jul  7 03:45:42 2010
New Revision: 961079

URL: http://svn.apache.org/viewvc?rev=961079&view=rev
Log:
cleanup connection shutdown logic

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
    activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala?rev=961079&r1=961078&r2=961079&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala Wed Jul  7 03:45:42 2010
@@ -52,6 +52,9 @@ class Domain {
 
 }
 
+object Router extends Log {
+
+}
 
 /**
  * Provides a non-blocking concurrent producer to consumer
@@ -65,8 +68,11 @@ class Domain {
  * to the destination. 
  *
  */
-class Router(var queue:DispatchQueue) {
-  
+class Router(var queue:DispatchQueue) extends DispatchLogging {
+
+  override protected def log = Router
+  protected def dispatchQueue:DispatchQueue = queue
+
   trait DestinationNode {
     var targets = List[DeliveryConsumer]()
     var routes = List[DeliveryProducerRoute]()
@@ -189,8 +195,10 @@ trait Route extends Retained {
 
 }
 
-class DeliveryProducerRoute(val destination:Destination, val queue:DispatchQueue, val producer:DeliveryProducer) extends BaseRetained with Route {
+class DeliveryProducerRoute(val destination:Destination, val queue:DispatchQueue, val producer:DeliveryProducer) extends BaseRetained with Route with DispatchLogging {
 
+  override protected def log = Router
+  protected def dispatchQueue:DispatchQueue = queue
 
   // Retain the queue while we are retained.
   queue.retain
@@ -211,7 +219,7 @@ class DeliveryProducerRoute(val destinat
 
   private def internal_bind(values:List[DeliveryConsumer]) = {
     values.foreach{ x=>
-      println("producer route attaching to conusmer.")
+      debug("producer route attaching to conusmer.")
       targets = x.open_session(queue) :: targets
     }
   }
@@ -220,6 +228,7 @@ class DeliveryProducerRoute(val destinat
     this.targets = this.targets.filterNot { x=>
       val rc = targets.contains(x.consumer)
       if( rc ) {
+        debug("producer route detaching from conusmer.")
         x.close
       }
       rc
@@ -228,6 +237,7 @@ class DeliveryProducerRoute(val destinat
 
   def disconnected() = ^ {
     this.targets.foreach { x=>
+      debug("producer route detaching from conusmer.")
       x.close
       x.consumer.release
     }    

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961079&r1=961078&r2=961079&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul  7 03:45:42 2010
@@ -44,7 +44,7 @@ abstract class Connection() extends Tran
   dispatchQueue.setTargetQueue(Dispatch.getRandomThreadQueue)
   
   var name = "connection"
-  var stopping = false;
+  var stopped = false;
 
   var transport:Transport = null
 
@@ -55,13 +55,13 @@ abstract class Connection() extends Tran
   }
 
   def stop() = {
-    stopping=true
+    stopped=true
     transport.stop()
     dispatchQueue.release
   }
 
   def onTransportFailure(error:IOException) = {
-    if (!stopping) {
+    if (!stopped) {
         onFailure(error);
     }
   }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961079&r1=961078&r2=961079&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul  7 03:45:42 2010
@@ -111,7 +111,7 @@ class StompProtocolHandler extends Proto
 
   override def onTransportDisconnected() = {
     if( !closed ) {
-      info("stop")
+      info("cleaning up resources")
       closed=true;
       if( producerRoute!=null ) {
         host.router.disconnect(producerRoute)
@@ -250,17 +250,21 @@ class StompProtocolHandler extends Proto
   }
 
   private def die(msg:String) = {
-    info("Shutting connection down due to: "+msg)
-    connection.transport.suspendRead
-    connection.transport.oneway(StompFrame(Responses.ERROR, Nil, ascii(msg)), null)
-    ^ {
-      connection.stop()
-    } ->: queue
+    if( !connection.stopped ) {
+      info("Shutting connection down due to: "+msg)
+      connection.transport.suspendRead
+      connection.transport.oneway(StompFrame(Responses.ERROR, Nil, ascii(msg)), null)
+      ^ {
+        connection.stop()
+      } ->: queue
+    }
   }
 
   override def onTransportFailure(error: IOException) = {
-    info(error, "Shutting connection down due to: %s", error)
-    super.onTransportFailure(error);
+    if( !connection.stopped ) {
+      info(error, "Shutting connection down due to: %s", error)
+      super.onTransportFailure(error);
+    }
   }
 }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=961079&r1=961078&r2=961079&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala Wed Jul  7 03:45:42 2010
@@ -84,7 +84,7 @@ class StompRemoteConsumer extends Remote
       transport.suspendRead
       dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, ^ {
         consumerRate.increment();
-        if (!stopping) {
+        if (!stopped) {
           transport.resumeRead
         }
       })
@@ -116,7 +116,7 @@ class StompRemoteProducer extends Remote
       delivery.setDisposer(^{
         rate.increment();
         val task = ^ {
-          if( !stopping ) {
+          if( !stopped ) {
             send_next
           }
         }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961079&r1=961078&r2=961079&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul  7 03:45:42 2010
@@ -18,8 +18,6 @@ package org.apache.activemq.transport.tc
 
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportListener;
-import org.apache.activemq.util.buffer.Buffer;
-import org.apache.activemq.util.buffer.ByteArrayOutputStream;
 import org.apache.activemq.util.buffer.DataByteArrayOutputStream;
 import org.apache.activemq.wireformat.WireFormat;
 import org.fusesource.hawtdispatch.Dispatch;
@@ -33,7 +31,6 @@ import java.net.*;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.Map;
 
@@ -130,7 +127,7 @@ public class TcpTransport implements Tra
             throw new IllegalArgumentException("listener is not set");
         }
         if (transportState != CREATED) {
-            throw new IllegalStateException("can only be started from the created stae");
+            throw new IllegalStateException("start can only be used from the created state");
         }
         transportState = RUNNING;
 
@@ -166,7 +163,7 @@ public class TcpTransport implements Tra
                             connectSource.release();
                             fireConnected();
                         } catch (IOException e) {
-                            listener.onTransportFailure(e);
+                            onTransportFailure(e);
                         }
                     }
                 }
@@ -202,10 +199,16 @@ public class TcpTransport implements Tra
                 try {
                     drainInbound();
                 } catch (IOException e) {
-                    listener.onTransportFailure(e);
+                    onTransportFailure(e);
                 }
             }
         });
+        readSource.setCancelHandler(new Runnable() {
+            public void run() {
+                readSource.release();
+                releaseResources();
+            }
+        });
 
         writeSource = Dispatch.createSource(channel, SelectionKey.OP_WRITE, dispatchQueue);
         writeSource.setEventHandler(new Runnable() {
@@ -219,6 +222,12 @@ public class TcpTransport implements Tra
                 }
             }
         });
+        writeSource.setCancelHandler(new Runnable() {
+            public void run() {
+                writeSource.release();
+                releaseResources();
+            }
+        });
 
         remoteAddress = channel.socket().getRemoteSocketAddress().toString();
         listener.onTransportConnected();
@@ -226,16 +235,41 @@ public class TcpTransport implements Tra
 
 
     public void stop() throws Exception {
-        if (readSource != null) {
-            readSource.release();
-            readSource = null;
-        }
-        if (writeSource != null) {
-            writeSource.release();
-            writeSource = null;
+        if (transportState != RUNNING) {
+            throw new IllegalStateException("stop can only be used from the started state");
         }
-        setDispatchQueue(null);
         transportState = DISPOSED;
+        readSource.cancel();
+        writeSource.cancel();
+    }
+
+    private void releaseResources() {
+        if( writeSource.isReleased() && writeSource.isReleased() ) {
+            try {
+                channel.close();
+            } catch (IOException ignore) {
+            }
+            listener.onTransportDisconnected();
+            OneWay oneWay = outbound.poll();
+            while (oneWay != null) {
+                if (oneWay.retained != null) {
+                    oneWay.retained.release();
+                }
+            }
+            setDispatchQueue(null);
+            next_outbound_buffer = null;
+            outbound_buffer = null;
+            unmarshalSession = null;
+        }
+    }
+
+    public void onTransportFailure(IOException error) {
+        if( socketState == CONNECTED ) {
+            socketState = DISCONNECTED;
+            listener.onTransportFailure(error);
+            readSource.cancel();
+            writeSource.cancel();
+        }
     }
 
 
@@ -253,7 +287,8 @@ public class TcpTransport implements Tra
                 throw new IOException("Not running.");
             }
         } catch (IOException e) {
-            listener.onTransportFailure(e);
+            onTransportFailure(e);
+            return;
         }
 
         boolean wasEmpty = next_outbound_buffer.size()==0;
@@ -265,7 +300,8 @@ public class TcpTransport implements Tra
             try {
                 wireformat.marshal(command, next_outbound_buffer);
             } catch (IOException e) {
-                listener.onTransportFailure(e);
+                onTransportFailure(e);
+                return;
             }
             if ( outbound_buffer.remaining()==0 ) {
                 writeSource.resume();
@@ -298,11 +334,7 @@ public class TcpTransport implements Tra
                         // marshall all the available frames..
                         OneWay oneWay = outbound.poll();
                         while (oneWay != null) {
-                            try {
-                                wireformat.marshal(oneWay.command, next_outbound_buffer);
-                            } catch (IOException e) {
-                                listener.onTransportFailure(e);
-                            }
+                            wireformat.marshal(oneWay.command, next_outbound_buffer);
                             if (oneWay.retained != null) {
                                 oneWay.retained.release();
                             }
@@ -322,13 +354,13 @@ public class TcpTransport implements Tra
             }
 
         } catch (IOException e) {
-            listener.onTransportFailure(e);
+            onTransportFailure(e);
+            return true;
         }
 
         return outbound.isEmpty() && outbound_buffer == null;
     }
 
-
     private void drainInbound() throws IOException {
         if (transportState == DISPOSED || readSource.isSuspended()) {
             return;
@@ -436,259 +468,6 @@ public class TcpTransport implements Tra
         this.socketOptions = socketOptions;
     }
 
-//    private static final Log LOG = LogFactory.getLog(TcpTransport.class);
-//    private static final ThreadPoolExecutor SOCKET_CLOSE;
-//    protected final URI remoteLocation;
-//    protected final URI localLocation;
-//    protected final WireFormat wireFormat;
-//
-//    protected int connectionTimeout = 30000;
-//    protected int soTimeout;
-//    protected int socketBufferSize = 64 * 1024;
-//    protected int ioBufferSize = 8 * 1024;
-//    protected boolean closeAsync = true;
-//    protected Socket socket;
-//    protected DataOutputStream dataOut;
-//    protected DataInputStream dataIn;
-//    protected TcpBufferedOutputStream buffOut = null;
-//
-//    private static final boolean ASYNC_WRITE = false;
-//    /**
-//     * trace=true -> the Transport stack where this TcpTransport object will be,
-//     * will have a TransportLogger layer trace=false -> the Transport stack
-//     * where this TcpTransport object will be, will NOT have a TransportLogger
-//     * layer, and therefore will never be able to print logging messages. This
-//     * parameter is most probably set in Connection or TransportConnector URIs.
-//     */
-//    protected boolean trace = false;
-//    /**
-//     * Name of the LogWriter implementation to use. Names are mapped to classes
-//     * in the
-//     * resources/META-INF/services/org/apache/activemq/transport/logwriters
-//     * directory. This parameter is most probably set in Connection or
-//     * TransportConnector URIs.
-//     */
-//    //    protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
-//    /**
-//     * Specifies if the TransportLogger will be manageable by JMX or not. Also,
-//     * as long as there is at least 1 TransportLogger which is manageable, a
-//     * TransportLoggerControl MBean will me created.
-//     */
-//    protected boolean dynamicManagement = false;
-//    /**
-//     * startLogging=true -> the TransportLogger object of the Transport stack
-//     * will initially write messages to the log. startLogging=false -> the
-//     * TransportLogger object of the Transport stack will initially NOT write
-//     * messages to the log. This parameter only has an effect if trace == true.
-//     * This parameter is most probably set in Connection or TransportConnector
-//     * URIs.
-//     */
-//    protected boolean startLogging = true;
-//    /**
-//     * Specifies the port that will be used by the JMX server to manage the
-//     * TransportLoggers. This should only be set in an URI by a client (producer
-//     * or consumer) since a broker will already create a JMX server. It is
-//     * useful for people who test a broker and clients in the same machine and
-//     * want to control both via JMX; a different port will be needed.
-//     */
-//    protected int jmxPort = 1099;
-//    protected int minmumWireFormatVersion;
-//    protected SocketFactory socketFactory;
-//    protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>();
-//
-//    private Map<String, Object> socketOptions;
-//    private Boolean keepAlive;
-//    private Boolean tcpNoDelay;
-//    private Thread runnerThread;
-//
-//    protected boolean useActivityMonitor;
-//
-//    /**
-//     * Connect to a remote Node - e.g. a Broker
-//     *
-//     * @param wireFormat
-//     * @param socketFactory
-//     * @param remoteLocation
-//     * @param localLocation
-//     *            - e.g. local InetAddress and local port
-//     * @throws IOException
-//     * @throws UnknownHostException
-//     */
-//    public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
-//        this.wireFormat = wireFormat;
-//        this.socketFactory = socketFactory;
-//        try {
-//            this.socket = socketFactory.createSocket();
-//        } catch (SocketException e) {
-//            this.socket = null;
-//        }
-//        this.remoteLocation = remoteLocation;
-//        this.localLocation = localLocation;
-//        setDaemon(false);
-//    }
-//
-//    /**
-//     * Initialize from a server Socket
-//     *
-//     * @param wireFormat
-//     * @param socket
-//     * @throws IOException
-//     */
-//    public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
-//        this.wireFormat = wireFormat;
-//        this.socket = socket;
-//        this.remoteLocation = null;
-//        this.localLocation = null;
-//        setDaemon(true);
-//    }
-//
-//    LinkedBlockingQueue<Object> outbound = new LinkedBlockingQueue<Object>();
-//    private Thread onewayThread;
-//
-//    /**
-//     * A one way asynchronous send
-//     */
-//    public void oneway(Object command) throws IOException {
-//        checkStarted();
-//        try {
-//            if (ASYNC_WRITE) {
-//                outbound.put(command);
-//            } else {
-//                wireFormat.marshal(command, dataOut);
-//                dataOut.flush();
-//            }
-//        } catch (InterruptedException e) {
-//            throw new InterruptedIOException();
-//        }
-//    }
-//
-//    protected void sendOneways() {
-//        try {
-//            LOG.debug("Started oneway thead");
-//            while (!isStopped()) {
-//                Object command = outbound.poll(500, TimeUnit.MILLISECONDS);
-//                if (command != null) {
-//                    try {
-//                        // int count=0;
-//                        while (command != null) {
-//                            wireFormat.marshal(command, dataOut);
-//                            // count++;
-//                            command = outbound.poll();
-//                        }
-//                        // System.out.println(count);
-//                        dataOut.flush();
-//                    } catch (IOException e) {
-//                        getTransportListener().onException(e);
-//                    }
-//                }
-//            }
-//        } catch (InterruptedException e) {
-//        }
-//    }
-//
-//    /**
-//     * @return pretty print of 'this'
-//     */
-//    public String toString() {
-//        return "tcp://" + socket.getInetAddress() + ":" + socket.getPort();
-//    }
-//
-//    /**
-//     * reads packets from a Socket
-//     */
-//    public void run() {
-//        LOG.trace("TCP consumer thread for " + this + " starting");
-//        this.runnerThread = Thread.currentThread();
-//        try {
-//            while (!isStopped()) {
-//                doRun();
-//            }
-//        } catch (IOException e) {
-//            stoppedLatch.get().countDown();
-//            onException(e);
-//        } catch (Throwable e) {
-//            stoppedLatch.get().countDown();
-//            IOException ioe = new IOException("Unexpected error occured");
-//            ioe.initCause(e);
-//            onException(ioe);
-//        } finally {
-//            stoppedLatch.get().countDown();
-//        }
-//    }
-//
-//    protected void doRun() throws IOException {
-//        try {
-//            Object command = readCommand();
-//            doConsume(command);
-//        } catch (SocketTimeoutException e) {
-//        } catch (InterruptedIOException e) {
-//        }
-//    }
-//
-//    protected Object readCommand() throws IOException {
-//        return wireFormat.unmarshal(dataIn);
-//    }
-//
-//    // Properties
-//    // -------------------------------------------------------------------------
-//
-//    public boolean isTrace() {
-//        return trace;
-//    }
-//
-//    public void setTrace(boolean trace) {
-//        this.trace = trace;
-//    }
-//
-//    void setUseInactivityMonitor(boolean val) {
-//        useActivityMonitor = val;
-//    }
-//
-//    public boolean isUseInactivityMonitor() {
-//        return useActivityMonitor;
-//    }
-//
-//    //    public String getLogWriterName() {
-//    //        return logWriterName;
-//    //    }
-//    //
-//    //    public void setLogWriterName(String logFormat) {
-//    //        this.logWriterName = logFormat;
-//    //    }
-//
-//    public boolean isDynamicManagement() {
-//        return dynamicManagement;
-//    }
-//
-//    public void setDynamicManagement(boolean useJmx) {
-//        this.dynamicManagement = useJmx;
-//    }
-//
-//    public boolean isStartLogging() {
-//        return startLogging;
-//    }
-//
-//    public void setStartLogging(boolean startLogging) {
-//        this.startLogging = startLogging;
-//    }
-//
-//    public int getJmxPort() {
-//        return jmxPort;
-//    }
-//
-//    public void setJmxPort(int jmxPort) {
-//        this.jmxPort = jmxPort;
-//    }
-//
-//    public int getMinmumWireFormatVersion() {
-//        return minmumWireFormatVersion;
-//    }
-//
-//    public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
-//        this.minmumWireFormatVersion = minmumWireFormatVersion;
-//    }
-//
-
     public boolean isUseLocalHost() {
         return useLocalHost;
     }
@@ -702,298 +481,4 @@ public class TcpTransport implements Tra
         this.useLocalHost = useLocalHost;
     }
 
-//    public int getSocketBufferSize() {
-//        return socketBufferSize;
-//    }
-//
-//    /**
-//     * Sets the buffer size to use on the socket
-//     */
-//    public void setSocketBufferSize(int socketBufferSize) {
-//        this.socketBufferSize = socketBufferSize;
-//    }
-//
-//    public int getSoTimeout() {
-//        return soTimeout;
-//    }
-//
-//    /**
-//     * Sets the socket timeout
-//     */
-//    public void setSoTimeout(int soTimeout) {
-//        this.soTimeout = soTimeout;
-//    }
-//
-//    public int getConnectionTimeout() {
-//        return connectionTimeout;
-//    }
-//
-//    /**
-//     * Sets the timeout used to connect to the socket
-//     */
-//    public void setConnectionTimeout(int connectionTimeout) {
-//        this.connectionTimeout = connectionTimeout;
-//    }
-//
-//    public Boolean getKeepAlive() {
-//        return keepAlive;
-//    }
-//
-//    /**
-//     * Enable/disable TCP KEEP_ALIVE mode
-//     */
-//    public void setKeepAlive(Boolean keepAlive) {
-//        this.keepAlive = keepAlive;
-//    }
-//
-//    public Boolean getTcpNoDelay() {
-//        return tcpNoDelay;
-//    }
-//
-//    /**
-//     * Enable/disable the TCP_NODELAY option on the socket
-//     */
-//    public void setTcpNoDelay(Boolean tcpNoDelay) {
-//        this.tcpNoDelay = tcpNoDelay;
-//    }
-//
-//    /**
-//     * @return the ioBufferSize
-//     */
-//    public int getIoBufferSize() {
-//        return this.ioBufferSize;
-//    }
-//
-//    /**
-//     * @param ioBufferSize
-//     *            the ioBufferSize to set
-//     */
-//    public void setIoBufferSize(int ioBufferSize) {
-//        this.ioBufferSize = ioBufferSize;
-//    }
-//
-//    /**
-//     * @return the closeAsync
-//     */
-//    public boolean isCloseAsync() {
-//        return closeAsync;
-//    }
-//
-//    /**
-//     * @param closeAsync
-//     *            the closeAsync to set
-//     */
-//    public void setCloseAsync(boolean closeAsync) {
-//        this.closeAsync = closeAsync;
-//    }
-//
-//    // Implementation methods
-//    // -------------------------------------------------------------------------
-//    protected String resolveHostName(String host) throws UnknownHostException {
-//        String localName = InetAddress.getLocalHost().getHostName();
-//        if (localName != null && isUseLocalHost()) {
-//            if (localName.equals(host)) {
-//                return "localhost";
-//            }
-//        }
-//        return host;
-//    }
-//
-//    /**
-//     * Configures the socket for use
-//     *
-//     * @param sock
-//     * @throws SocketException
-//     */
-//    protected void initialiseSocket(Socket sock) throws SocketException {
-//        if (socketOptions != null) {
-//            IntrospectionSupport.setProperties(socket, socketOptions);
-//        }
-//
-//        try {
-//            sock.setReceiveBufferSize(socketBufferSize);
-//            sock.setSendBufferSize(socketBufferSize);
-//        } catch (SocketException se) {
-//            LOG.warn("Cannot set socket buffer size = " + socketBufferSize);
-//            LOG.debug("Cannot set socket buffer size. Reason: " + se, se);
-//        }
-//        sock.setSoTimeout(soTimeout);
-//
-//        if (keepAlive != null) {
-//            sock.setKeepAlive(keepAlive.booleanValue());
-//        }
-//        if (tcpNoDelay != null) {
-//            sock.setTcpNoDelay(tcpNoDelay.booleanValue());
-//        }
-//    }
-//
-//    protected void doStart() throws Exception {
-//        connect();
-//        if (ASYNC_WRITE) {
-//            onewayThread = new Thread() {
-//                @Override
-//                public void run() {
-//                    sendOneways();
-//                }
-//            };
-//            onewayThread.start();
-//        }
-//
-//        stoppedLatch.set(new CountDownLatch(1));
-//        super.doStart();
-//    }
-//
-//    protected void connect() throws Exception {
-//
-//        if (socket == null && socketFactory == null) {
-//            throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
-//        }
-//
-//        InetSocketAddress localAddress = null;
-//        InetSocketAddress remoteAddress = null;
-//
-//        if (localLocation != null) {
-//            localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
-//        }
-//
-//        if (remoteLocation != null) {
-//            String host = resolveHostName(remoteLocation.getHost());
-//            remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
-//        }
-//
-//        if (socket != null) {
-//
-//            if (localAddress != null) {
-//                socket.bind(localAddress);
-//            }
-//
-//            // If it's a server accepted socket.. we don't need to connect it
-//            // to a remote address.
-//            if (remoteAddress != null) {
-//                if (connectionTimeout >= 0) {
-//                    socket.connect(remoteAddress, connectionTimeout);
-//                } else {
-//                    socket.connect(remoteAddress);
-//                }
-//            }
-//
-//        } else {
-//            // For SSL sockets.. you can't create an unconnected socket :(
-//            // This means the timout option are not supported either.
-//            if (localAddress != null) {
-//                socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), localAddress.getAddress(), localAddress.getPort());
-//            } else {
-//                socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
-//            }
-//        }
-//
-//        initialiseSocket(socket);
-//        initializeStreams();
-//    }
-//
-//    protected void doStop(ServiceStopper stopper) throws Exception {
-//        if (LOG.isDebugEnabled()) {
-//            LOG.debug("Stopping transport " + this);
-//        }
-//
-//        // Closing the streams flush the sockets before closing.. if the socket
-//        // is hung.. then this hangs the close.
-//        // closeStreams();
-//        if (socket != null) {
-//            if (closeAsync) {
-//                // closing the socket can hang also
-//                final CountDownLatch latch = new CountDownLatch(1);
-//
-//                SOCKET_CLOSE.execute(new Runnable() {
-//
-//                    public void run() {
-//                        try {
-//                            socket.close();
-//                        } catch (IOException e) {
-//                            LOG.debug("Caught exception closing socket", e);
-//                        } finally {
-//                            latch.countDown();
-//                        }
-//                    }
-//
-//                });
-//                latch.await(1, TimeUnit.SECONDS);
-//            } else {
-//                try {
-//                    socket.close();
-//                } catch (IOException e) {
-//                    LOG.debug("Caught exception closing socket", e);
-//                }
-//
-//            }
-//            if (ASYNC_WRITE) {
-//                onewayThread.join();
-//            }
-//        }
-//    }
-//
-//    /**
-//     * Override so that stop() blocks until the run thread is no longer running.
-//     */
-//    @Override
-//    public void stop() throws Exception {
-//        super.stop();
-//        CountDownLatch countDownLatch = stoppedLatch.get();
-//        if (countDownLatch != null && Thread.currentThread() != this.runnerThread) {
-//            countDownLatch.await(1, TimeUnit.SECONDS);
-//        }
-//    }
-//
-//    protected void initializeStreams() throws Exception {
-//        TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
-//        this.dataIn = new DataInputStream(buffIn);
-//        buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
-//        this.dataOut = new DataOutputStream(buffOut);
-//    }
-//
-//    protected void closeStreams() throws IOException {
-//        if (dataOut != null) {
-//            dataOut.close();
-//        }
-//        if (dataIn != null) {
-//            dataIn.close();
-//        }
-//    }
-//
-//    public void setSocketOptions(Map<String, Object> socketOptions) {
-//        this.socketOptions = new HashMap<String, Object>(socketOptions);
-//    }
-//
-//    public String getRemoteAddress() {
-//        if (socket != null) {
-//            return "" + socket.getRemoteSocketAddress();
-//        }
-//        return null;
-//    }
-//
-//    @Override
-//    public <T> T narrow(Class<T> target) {
-//        if (target == Socket.class) {
-//            return target.cast(socket);
-//        } else if (target == TcpBufferedOutputStream.class) {
-//            return target.cast(buffOut);
-//        }
-//        return super.narrow(target);
-//    }
-//
-//    static {
-//        SOCKET_CLOSE = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
-//            public Thread newThread(Runnable runnable) {
-//                Thread thread = new Thread(runnable, "TcpSocketClose: " + runnable);
-//                thread.setPriority(Thread.MAX_PRIORITY);
-//                thread.setDaemon(true);
-//                return thread;
-//            }
-//        });
-//    }
-//
-//    public WireFormat getWireformat()
-//    {
-//        return wireFormat;
-//    }
 }