You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jl...@apache.org on 2006/09/22 06:30:22 UTC

svn commit: r448808 - /incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java

Author: jlim
Date: Thu Sep 21 21:30:21 2006
New Revision: 448808

URL: http://svn.apache.org/viewvc?view=rev&rev=448808
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-933

Modified:
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?view=diff&rev=448808&r1=448807&r2=448808
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Thu Sep 21 21:30:21 2006
@@ -28,6 +28,7 @@
 import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.net.UnknownHostException;
+import java.util.HashMap;
 import java.util.Map;
 
 import javax.net.SocketFactory;
@@ -50,27 +51,25 @@
 public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
     private static final Log log = LogFactory.getLog(TcpTransport.class);
 
-    private int connectionTimeout = 30000;
-    private int soTimeout = 0;
-    private int socketBufferSize = 128 * 1024;
-    private Socket socket;
-    private DataOutputStream dataOut;
-    private DataInputStream dataIn;
-    private WireFormat wireFormat;
-    private boolean trace;
-    private boolean useLocalHost = true;
-    private int minmumWireFormatVersion;
-    private InetSocketAddress remoteAddress;
-	private InetSocketAddress localAddress;
+    protected final URI remoteLocation;
+    protected final URI localLocation;
+    protected final WireFormat wireFormat;
+
+    protected int connectionTimeout = 30000;
+    protected int soTimeout = 0;
+    protected int socketBufferSize = 128 * 1024;
+    protected Socket socket;
+    protected DataOutputStream dataOut;
+    protected DataInputStream dataIn;
+    protected boolean trace;
+    protected boolean useLocalHost = true;
+    protected int minmumWireFormatVersion;
+    protected SocketFactory socketFactory;
+
+    private Map socketOptions;    
+    private Boolean keepAlive;
+    private Boolean tcpNoDelay;
     
-    /**
-     * Construct basic helpers
-     * 
-     * @param wireFormat
-     */
-    protected TcpTransport(WireFormat wireFormat) {
-        this.wireFormat = wireFormat;
-    }
 
     /**
      * Connect to a remote Node - e.g. a Broker
@@ -84,8 +83,16 @@
      * @throws UnknownHostException
      */
     public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
-        this(wireFormat);
-        this.socket = createSocket(socketFactory, remoteLocation, localLocation);
+        this.wireFormat = wireFormat;
+	this.socketFactory = socketFactory;
+	try {
+		this.socket = socketFactory.createSocket();
+	} catch (SocketException e) {
+		this.socket = null;
+	}
+	this.remoteLocation = remoteLocation;
+	this.localLocation = localLocation;
+        setDaemon(false);
     }
 
     /**
@@ -96,8 +103,10 @@
      * @throws IOException
      */
     public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
-        this(wireFormat);
+        this.wireFormat = wireFormat;
         this.socket = socket;
+	this.remoteLocation = null;
+	this.localLocation = null;        
         setDaemon(true);
     }
 
@@ -209,32 +218,33 @@
     }
     
 
-    // Implementation methods
-    // -------------------------------------------------------------------------
+    public Boolean getKeepAlive() {
+        return keepAlive;
+    }
 
     /**
-     * Factory method to create a new socket
-     * 
-     * @param remoteLocation
-     * @param localLocation ignored if null
-     * @return
-     * @throws IOException
-     * @throws IOException
-     * @throws UnknownHostException
+     * Enable/disable TCP KEEP_ALIVE mode
      */
-    protected Socket createSocket(SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws IOException, UnknownHostException {
-    	
-        String host = resolveHostName(remoteLocation.getHost());
-        remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
-        
-        if( localLocation!=null ) {
-        	localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
-        }
-        
-        Socket sock = socketFactory.createSocket();        
-        return sock;
+    public void setKeepAlive(Boolean keepAlive) {
+        this.keepAlive = keepAlive;
     }
 
+    public Boolean getTcpNoDelay() {
+        return tcpNoDelay;
+    }
+
+    /**
+     * Enable/disable the TCP_NODELAY option on the socket
+     */
+    public void setTcpNoDelay(Boolean tcpNoDelay) {
+        this.tcpNoDelay = tcpNoDelay;
+    }
+
+
+    // Implementation methods
+    // -------------------------------------------------------------------------
+
+
     protected String resolveHostName(String host) throws UnknownHostException {
         String localName = InetAddress.getLocalHost().getHostName();
         if (localName != null && isUseLocalHost()) {
@@ -252,6 +262,10 @@
      * @throws SocketException
      */
     protected void initialiseSocket(Socket sock) throws SocketException {
+    	if( socketOptions != null ) {
+    		IntrospectionSupport.setProperties(socket, socketOptions);
+    	}
+    	    	
         try {
             sock.setReceiveBufferSize(socketBufferSize);
             sock.setSendBufferSize(socketBufferSize);
@@ -261,25 +275,67 @@
             log.debug("Cannot set socket buffer size. Reason: " + se, se);
         }
         sock.setSoTimeout(soTimeout);
+
+        if (keepAlive != null) {
+            sock.setKeepAlive(keepAlive.booleanValue());
+        }
+        if (tcpNoDelay != null) {
+            sock.setTcpNoDelay(tcpNoDelay.booleanValue());
+        }        
     }
 
     protected void doStart() throws Exception {
-        initialiseSocket(socket);
-        if( localAddress!=null ) {
-        	socket.bind(localAddress);
-        }
-        if (remoteAddress != null) {
-            if (connectionTimeout >= 0) {
-                socket.connect(remoteAddress, connectionTimeout);
-            }
-            else {
-                socket.connect(remoteAddress);
-            }
-        }
-        initializeStreams();
+        connect();
         super.doStart();
     }
 
+     protected void connect() throws SocketException, IOException {
+		
+	if( socket == null && socketFactory == null ) {
+		throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
+	}
+		
+	InetSocketAddress localAddress=null;
+	InetSocketAddress remoteAddress=null;
+		
+        if( localLocation!=null ) {
+           localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
+        }  
+                      
+	if( remoteLocation!=null ) {
+		String host = resolveHostName(remoteLocation.getHost());
+	        remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
+	}
+	
+   	if( socket!=null ) {
+    		
+    		if( localAddress!=null )
+    			socket.bind(localAddress);
+    		
+    		// If it's a server accepted socket.. we don't need to connect it 
+    		// to a remote address.
+    		if ( remoteAddress!=null ) {
+	            if (connectionTimeout >= 0) {
+	                socket.connect(remoteAddress, connectionTimeout);
+	            } else {
+	                socket.connect(remoteAddress);
+	            }
+    		}
+            
+    	} else {
+    		// For SSL sockets.. you can't create an unconnected socket :(
+    		// This means the timout option are not supported either.
+    		if( localAddress!=null ) {
+            	socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), localAddress.getAddress(), localAddress.getPort());
+    		} else {
+            	socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
+    		}
+    	}
+		
+	initialiseSocket(socket);        
+        initializeStreams();
+	}
+
     protected void doStop(ServiceStopper stopper) throws Exception {
     	// Closing the streams flush the sockets before closing.. if the socket
     	// is hung.. then this hangs the close.
@@ -306,6 +362,13 @@
     }
 
     public void setSocketOptions(Map socketOptions) {
-        IntrospectionSupport.setProperties(socket, socketOptions);
+    	this.socketOptions = new HashMap(socketOptions);
+    }
+
+    public String getRemoteAddress() {
+	if(socket != null){
+		return "" + socket.getRemoteSocketAddress();
+	}
+	return null;
     }
 }