You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jl...@apache.org on 2006/09/22 06:30:22 UTC
svn commit: r448808 -
/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
Author: jlim
Date: Thu Sep 21 21:30:21 2006
New Revision: 448808
URL: http://svn.apache.org/viewvc?view=rev&rev=448808
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-933
Modified:
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?view=diff&rev=448808&r1=448807&r2=448808
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Thu Sep 21 21:30:21 2006
@@ -28,6 +28,7 @@
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.UnknownHostException;
+import java.util.HashMap;
import java.util.Map;
import javax.net.SocketFactory;
@@ -50,27 +51,25 @@
public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
private static final Log log = LogFactory.getLog(TcpTransport.class);
- private int connectionTimeout = 30000;
- private int soTimeout = 0;
- private int socketBufferSize = 128 * 1024;
- private Socket socket;
- private DataOutputStream dataOut;
- private DataInputStream dataIn;
- private WireFormat wireFormat;
- private boolean trace;
- private boolean useLocalHost = true;
- private int minmumWireFormatVersion;
- private InetSocketAddress remoteAddress;
- private InetSocketAddress localAddress;
+ protected final URI remoteLocation;
+ protected final URI localLocation;
+ protected final WireFormat wireFormat;
+
+ protected int connectionTimeout = 30000;
+ protected int soTimeout = 0;
+ protected int socketBufferSize = 128 * 1024;
+ protected Socket socket;
+ protected DataOutputStream dataOut;
+ protected DataInputStream dataIn;
+ protected boolean trace;
+ protected boolean useLocalHost = true;
+ protected int minmumWireFormatVersion;
+ protected SocketFactory socketFactory;
+
+ private Map socketOptions;
+ private Boolean keepAlive;
+ private Boolean tcpNoDelay;
- /**
- * Construct basic helpers
- *
- * @param wireFormat
- */
- protected TcpTransport(WireFormat wireFormat) {
- this.wireFormat = wireFormat;
- }
/**
* Connect to a remote Node - e.g. a Broker
@@ -84,8 +83,16 @@
* @throws UnknownHostException
*/
public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
- this(wireFormat);
- this.socket = createSocket(socketFactory, remoteLocation, localLocation);
+ this.wireFormat = wireFormat;
+ this.socketFactory = socketFactory;
+ try {
+ this.socket = socketFactory.createSocket();
+ } catch (SocketException e) {
+ this.socket = null;
+ }
+ this.remoteLocation = remoteLocation;
+ this.localLocation = localLocation;
+ setDaemon(false);
}
/**
@@ -96,8 +103,10 @@
* @throws IOException
*/
public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
- this(wireFormat);
+ this.wireFormat = wireFormat;
this.socket = socket;
+ this.remoteLocation = null;
+ this.localLocation = null;
setDaemon(true);
}
@@ -209,32 +218,33 @@
}
- // Implementation methods
- // -------------------------------------------------------------------------
+ public Boolean getKeepAlive() {
+ return keepAlive;
+ }
/**
- * Factory method to create a new socket
- *
- * @param remoteLocation
- * @param localLocation ignored if null
- * @return
- * @throws IOException
- * @throws IOException
- * @throws UnknownHostException
+ * Enable/disable TCP KEEP_ALIVE mode
*/
- protected Socket createSocket(SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws IOException, UnknownHostException {
-
- String host = resolveHostName(remoteLocation.getHost());
- remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
-
- if( localLocation!=null ) {
- localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
- }
-
- Socket sock = socketFactory.createSocket();
- return sock;
+ public void setKeepAlive(Boolean keepAlive) {
+ this.keepAlive = keepAlive;
}
+ public Boolean getTcpNoDelay() {
+ return tcpNoDelay;
+ }
+
+ /**
+ * Enable/disable the TCP_NODELAY option on the socket
+ */
+ public void setTcpNoDelay(Boolean tcpNoDelay) {
+ this.tcpNoDelay = tcpNoDelay;
+ }
+
+
+ // Implementation methods
+ // -------------------------------------------------------------------------
+
+
protected String resolveHostName(String host) throws UnknownHostException {
String localName = InetAddress.getLocalHost().getHostName();
if (localName != null && isUseLocalHost()) {
@@ -252,6 +262,10 @@
* @throws SocketException
*/
protected void initialiseSocket(Socket sock) throws SocketException {
+ if( socketOptions != null ) {
+ IntrospectionSupport.setProperties(socket, socketOptions);
+ }
+
try {
sock.setReceiveBufferSize(socketBufferSize);
sock.setSendBufferSize(socketBufferSize);
@@ -261,25 +275,67 @@
log.debug("Cannot set socket buffer size. Reason: " + se, se);
}
sock.setSoTimeout(soTimeout);
+
+ if (keepAlive != null) {
+ sock.setKeepAlive(keepAlive.booleanValue());
+ }
+ if (tcpNoDelay != null) {
+ sock.setTcpNoDelay(tcpNoDelay.booleanValue());
+ }
}
protected void doStart() throws Exception {
- initialiseSocket(socket);
- if( localAddress!=null ) {
- socket.bind(localAddress);
- }
- if (remoteAddress != null) {
- if (connectionTimeout >= 0) {
- socket.connect(remoteAddress, connectionTimeout);
- }
- else {
- socket.connect(remoteAddress);
- }
- }
- initializeStreams();
+ connect();
super.doStart();
}
+ protected void connect() throws SocketException, IOException {
+
+ if( socket == null && socketFactory == null ) {
+ throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
+ }
+
+ InetSocketAddress localAddress=null;
+ InetSocketAddress remoteAddress=null;
+
+ if( localLocation!=null ) {
+ localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
+ }
+
+ if( remoteLocation!=null ) {
+ String host = resolveHostName(remoteLocation.getHost());
+ remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
+ }
+
+ if( socket!=null ) {
+
+ if( localAddress!=null )
+ socket.bind(localAddress);
+
+ // If it's a server accepted socket.. we don't need to connect it
+ // to a remote address.
+ if ( remoteAddress!=null ) {
+ if (connectionTimeout >= 0) {
+ socket.connect(remoteAddress, connectionTimeout);
+ } else {
+ socket.connect(remoteAddress);
+ }
+ }
+
+ } else {
+ // For SSL sockets.. you can't create an unconnected socket :(
+ // This means the timout option are not supported either.
+ if( localAddress!=null ) {
+ socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), localAddress.getAddress(), localAddress.getPort());
+ } else {
+ socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
+ }
+ }
+
+ initialiseSocket(socket);
+ initializeStreams();
+ }
+
protected void doStop(ServiceStopper stopper) throws Exception {
// Closing the streams flush the sockets before closing.. if the socket
// is hung.. then this hangs the close.
@@ -306,6 +362,13 @@
}
public void setSocketOptions(Map socketOptions) {
- IntrospectionSupport.setProperties(socket, socketOptions);
+ this.socketOptions = new HashMap(socketOptions);
+ }
+
+ public String getRemoteAddress() {
+ if(socket != null){
+ return "" + socket.getRemoteSocketAddress();
+ }
+ return null;
}
}