You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/07/14 07:54:13 UTC
svn commit: r421811 - in
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport:
TransportServerSupport.java TransportServerThreadSupport.java
tcp/TcpTransport.java tcp/TcpTransportFactory.java
tcp/TcpTransportServer.java
Author: chirino
Date: Thu Jul 13 22:54:13 2006
New Revision: 421811
URL: http://svn.apache.org/viewvc?rev=421811&view=rev
Log:
Cleaned up the TCP transport a little.
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerSupport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerSupport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerSupport.java?rev=421811&r1=421810&r2=421811&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerSupport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerSupport.java Thu Jul 13 22:54:13 2006
@@ -27,18 +27,16 @@
*/
public abstract class TransportServerSupport extends ServiceSupport implements TransportServer {
- private URI location;
+ private URI connectURI;
+ private URI bindLocation;
private TransportAcceptListener acceptListener;
public TransportServerSupport() {
}
public TransportServerSupport(URI location) {
- this.location = location;
- }
-
- public URI getConnectURI() {
- return location;
+ this.connectURI = location;
+ this.bindLocation = location;
}
/**
@@ -60,16 +58,16 @@
/**
* @return Returns the location.
*/
- public URI getLocation() {
- return location;
+ public URI getConnectURI() {
+ return connectURI;
}
/**
* @param location
* The location to set.
*/
- public void setLocation(URI location) {
- this.location = location;
+ public void setConnectURI(URI location) {
+ this.connectURI = location;
}
protected void onAcceptError(Exception e) {
@@ -77,4 +75,12 @@
acceptListener.onAcceptError(e);
}
}
+
+ public URI getBindLocation() {
+ return bindLocation;
+ }
+
+ public void setBindLocation(URI bindLocation) {
+ this.bindLocation = bindLocation;
+ }
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java?rev=421811&r1=421810&r2=421811&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java Thu Jul 13 22:54:13 2006
@@ -68,7 +68,7 @@
}
protected void doStart() throws Exception {
- log.info("Listening for connections at: " + getLocation());
+ log.info("Listening for connections at: " + getConnectURI());
runner = new Thread(this, "ActiveMQ Transport Server: "+toString());
runner.setDaemon(daemon);
runner.setPriority(ThreadPriorities.BROKER_MANAGEMENT);
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=421811&r1=421810&r2=421811&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Thu Jul 13 22:54:13 2006
@@ -23,6 +23,7 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URI;
@@ -49,27 +50,19 @@
public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
private static final Log log = LogFactory.getLog(TcpTransport.class);
- private int connectionTimeout = 30000;
- private int soTimeout = 0;
- private int socketBufferSize = 128 * 1024;
- private Socket socket;
- private DataOutputStream dataOut;
- private DataInputStream dataIn;
- private WireFormat wireFormat;
- private boolean trace;
- private boolean useLocalHost = true;
- private int minmumWireFormatVersion;
- private InetSocketAddress remoteAddress;
- private InetSocketAddress localAddress;
-
- /**
- * Construct basic helpers
- *
- * @param wireFormat
- */
- protected TcpTransport(WireFormat wireFormat) {
- this.wireFormat = wireFormat;
- }
+ protected final URI remoteLocation;
+ protected final URI localLocation;
+ protected final WireFormat wireFormat;
+
+ protected int connectionTimeout = 30000;
+ protected int soTimeout = 0;
+ protected int socketBufferSize = 128 * 1024;
+ protected Socket socket;
+ protected DataOutputStream dataOut;
+ protected DataInputStream dataIn;
+ protected boolean trace;
+ protected boolean useLocalHost = true;
+ protected int minmumWireFormatVersion;
/**
* Connect to a remote Node - e.g. a Broker
@@ -83,10 +76,14 @@
* @throws UnknownHostException
*/
public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
- this(wireFormat);
- this.socket = createSocket(socketFactory, remoteLocation, localLocation);
+ this.wireFormat = wireFormat;
+ this.socket = socketFactory.createSocket();
+ this.remoteLocation = remoteLocation;
+ this.localLocation = localLocation;
+ setDaemon(false);
}
+
/**
* Initialize from a server Socket
*
@@ -95,8 +92,10 @@
* @throws IOException
*/
public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
- this(wireFormat);
+ this.wireFormat = wireFormat;
this.socket = socket;
+ this.remoteLocation = null;
+ this.localLocation = null;
setDaemon(true);
}
@@ -211,29 +210,6 @@
// Implementation methods
// -------------------------------------------------------------------------
- /**
- * Factory method to create a new socket
- *
- * @param remoteLocation
- * @param localLocation ignored if null
- * @return
- * @throws IOException
- * @throws IOException
- * @throws UnknownHostException
- */
- protected Socket createSocket(SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws IOException, UnknownHostException {
-
- String host = resolveHostName(remoteLocation.getHost());
- remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
-
- if( localLocation!=null ) {
- localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
- }
-
- Socket sock = socketFactory.createSocket();
- return sock;
- }
-
protected String resolveHostName(String host) throws UnknownHostException {
String localName = InetAddress.getLocalHost().getHostName();
if (localName != null && isUseLocalHost()) {
@@ -263,23 +239,33 @@
}
protected void doStart() throws Exception {
- initialiseSocket(socket);
- if( localAddress!=null ) {
+ connect();
+ super.doStart();
+ }
+
+ protected void connect() throws SocketException, IOException {
+
+ initialiseSocket(socket);
+
+ if( localLocation!=null ) {
+ SocketAddress localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
socket.bind(localAddress);
- }
- if (remoteAddress != null) {
+ }
+ if( remoteLocation!=null ) {
+ String host = resolveHostName(remoteLocation.getHost());
+ InetSocketAddress remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
if (connectionTimeout >= 0) {
socket.connect(remoteAddress, connectionTimeout);
}
else {
socket.connect(remoteAddress);
}
- }
+ }
+
initializeStreams();
- super.doStart();
- }
+ }
- protected void doStop(ServiceStopper stopper) throws Exception {
+ protected void doStop(ServiceStopper stopper) throws Exception {
closeStreams();
if (socket != null) {
socket.close();
@@ -303,7 +289,7 @@
}
public void setSocketOptions(Map socketOptions) {
- IntrospectionSupport.setProperties(socket, socketOptions);
+ IntrospectionSupport.setProperties(socket, socketOptions);
}
public String getRemoteAddress() {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java?rev=421811&r1=421810&r2=421811&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java Thu Jul 13 22:54:13 2006
@@ -52,6 +52,7 @@
IntrospectionSupport.setProperties(server, options);
Map transportOptions = IntrospectionSupport.extractProperties(options, "transport.");
server.setTransportOption(transportOptions);
+ server.bind();
return server;
}
@@ -125,7 +126,7 @@
* @throws UnknownHostException
* @throws IOException
*/
- private TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
+ protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
return new TcpTransport(wf, socketFactory, location, localLocation);
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=421811&r1=421810&r2=421811&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Thu Jul 13 22:54:13 2006
@@ -35,6 +35,7 @@
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.TransportServerThreadSupport;
+import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -48,24 +49,47 @@
*/
public class TcpTransportServer extends TransportServerThreadSupport {
+
private static final Log log = LogFactory.getLog(TcpTransportServer.class);
- private ServerSocket serverSocket;
- private int backlog = 5000;
- private WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
- private final TcpTransportFactory transportFactory;
- private long maxInactivityDuration = 30000;
- private int minmumWireFormatVersion;
- private boolean trace;
- private Map transportOptions;
+ protected ServerSocket serverSocket;
+ protected int backlog = 5000;
+ protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
+ protected final TcpTransportFactory transportFactory;
+ protected long maxInactivityDuration = 30000;
+ protected int minmumWireFormatVersion;
+ protected boolean trace;
+ protected Map transportOptions;
+ protected final ServerSocketFactory serverSocketFactory;
public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
super(location);
this.transportFactory=transportFactory;
- this.serverSocket = createServerSocket(location, serverSocketFactory);
- this.serverSocket.setSoTimeout(2000);
- updatePhysicalUri(location);
+ this.serverSocketFactory = serverSocketFactory;
}
+ public void bind() throws IOException {
+ URI bind = getBindLocation();
+
+ String host = bind.getHost();
+ host = (host == null || host.length() == 0) ? "localhost" : host;
+ InetAddress addr = InetAddress.getByName(host);
+
+ if (host.trim().equals("localhost") || addr.equals(InetAddress.getLocalHost())) {
+ this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog);
+ }
+ else {
+ this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
+ }
+ this.serverSocket.setSoTimeout(2000);
+
+ try {
+ setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(bind.getHost()), serverSocket.getLocalPort(), bind.getPath(),
+ bind.getQuery(), bind.getFragment()));
+ } catch (URISyntaxException e) {
+ throw IOExceptionSupport.create(e);
+ }
+ }
+
/**
* @return Returns the wireFormatFactory.
*/
@@ -168,19 +192,7 @@
* @return pretty print of this
*/
public String toString() {
- return ""+getLocation();
- }
-
- /**
- * In cases where we construct ourselves with a zero port we need to
- * regenerate the URI with the real physical port so that people can connect
- * to us via discovery
- *
- * @throws UnknownHostException
- */
- protected void updatePhysicalUri(URI bindAddr) throws URISyntaxException, UnknownHostException {
- setLocation(new URI(bindAddr.getScheme(), bindAddr.getUserInfo(), resolveHostName(bindAddr.getHost()), serverSocket.getLocalPort(), bindAddr.getPath(),
- bindAddr.getQuery(), bindAddr.getFragment()));
+ return ""+getBindLocation();
}
/**
@@ -196,26 +208,6 @@
result = InetAddress.getLocalHost().getHostName();
}
return result;
- }
-
- /**
- * Factory method to create a new ServerSocket
- *
- * @throws UnknownHostException
- * @throws IOException
- */
- protected ServerSocket createServerSocket(URI bind, ServerSocketFactory factory) throws UnknownHostException, IOException {
- ServerSocket answer = null;
- String host = bind.getHost();
- host = (host == null || host.length() == 0) ? "localhost" : host;
- InetAddress addr = InetAddress.getByName(host);
- if (host.trim().equals("localhost") || addr.equals(InetAddress.getLocalHost())) {
- answer = factory.createServerSocket(bind.getPort(), backlog);
- }
- else {
- answer = factory.createServerSocket(bind.getPort(), backlog, addr);
- }
- return answer;
}
protected void doStop(ServiceStopper stopper) throws Exception {