You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/07/14 07:54:13 UTC

svn commit: r421811 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport: TransportServerSupport.java TransportServerThreadSupport.java tcp/TcpTransport.java tcp/TcpTransportFactory.java tcp/TcpTransportServer.java

Author: chirino
Date: Thu Jul 13 22:54:13 2006
New Revision: 421811

URL: http://svn.apache.org/viewvc?rev=421811&view=rev
Log:
Cleaned up the TCP transport a little.

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerSupport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerSupport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerSupport.java?rev=421811&r1=421810&r2=421811&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerSupport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerSupport.java Thu Jul 13 22:54:13 2006
@@ -27,18 +27,16 @@
  */
 public abstract class TransportServerSupport extends ServiceSupport implements TransportServer {
 
-    private URI location;
+    private URI connectURI;
+	private URI bindLocation;
     private TransportAcceptListener acceptListener;
 
     public TransportServerSupport() {
     }
 
     public TransportServerSupport(URI location) {
-        this.location = location;
-    }
-
-    public URI getConnectURI() {
-        return location;
+        this.connectURI = location;
+        this.bindLocation = location;
     }
 
     /**
@@ -60,16 +58,16 @@
     /**
      * @return Returns the location.
      */
-    public URI getLocation() {
-        return location;
+    public URI getConnectURI() {
+        return connectURI;
     }
 
     /**
      * @param location
      *            The location to set.
      */
-    public void setLocation(URI location) {
-        this.location = location;
+    public void setConnectURI(URI location) {
+        this.connectURI = location;
     }
 
     protected void onAcceptError(Exception e) {
@@ -77,4 +75,12 @@
             acceptListener.onAcceptError(e);
         }
     }
+
+	public URI getBindLocation() {
+		return bindLocation;
+	}
+
+	public void setBindLocation(URI bindLocation) {
+		this.bindLocation = bindLocation;
+	}
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java?rev=421811&r1=421810&r2=421811&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java Thu Jul 13 22:54:13 2006
@@ -68,7 +68,7 @@
     }
 
     protected void doStart() throws Exception {
-        log.info("Listening for connections at: " + getLocation());
+        log.info("Listening for connections at: " + getConnectURI());
         runner = new Thread(this, "ActiveMQ Transport Server: "+toString());
         runner.setDaemon(daemon);
         runner.setPriority(ThreadPriorities.BROKER_MANAGEMENT);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=421811&r1=421810&r2=421811&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Thu Jul 13 22:54:13 2006
@@ -23,6 +23,7 @@
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.net.SocketAddress;
 import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.net.URI;
@@ -49,27 +50,19 @@
 public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
     private static final Log log = LogFactory.getLog(TcpTransport.class);
 
-    private int connectionTimeout = 30000;
-    private int soTimeout = 0;
-    private int socketBufferSize = 128 * 1024;
-    private Socket socket;
-    private DataOutputStream dataOut;
-    private DataInputStream dataIn;
-    private WireFormat wireFormat;
-    private boolean trace;
-    private boolean useLocalHost = true;
-    private int minmumWireFormatVersion;
-    private InetSocketAddress remoteAddress;
-	private InetSocketAddress localAddress;
-    
-    /**
-     * Construct basic helpers
-     * 
-     * @param wireFormat
-     */
-    protected TcpTransport(WireFormat wireFormat) {
-        this.wireFormat = wireFormat;
-    }
+    protected final URI remoteLocation;
+    protected final URI localLocation;
+    protected final WireFormat wireFormat;
+
+    protected int connectionTimeout = 30000;
+    protected int soTimeout = 0;
+    protected int socketBufferSize = 128 * 1024;
+    protected Socket socket;
+    protected DataOutputStream dataOut;
+    protected DataInputStream dataIn;
+    protected boolean trace;
+    protected boolean useLocalHost = true;
+    protected int minmumWireFormatVersion;
 
     /**
      * Connect to a remote Node - e.g. a Broker
@@ -83,10 +76,14 @@
      * @throws UnknownHostException
      */
     public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
-        this(wireFormat);
-        this.socket = createSocket(socketFactory, remoteLocation, localLocation);
+        this.wireFormat = wireFormat;
+        this.socket = socketFactory.createSocket();
+		this.remoteLocation = remoteLocation;
+		this.localLocation = localLocation;
+        setDaemon(false);
     }
 
+
     /**
      * Initialize from a server Socket
      * 
@@ -95,8 +92,10 @@
      * @throws IOException
      */
     public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
-        this(wireFormat);
+        this.wireFormat = wireFormat;
         this.socket = socket;
+		this.remoteLocation = null;
+		this.localLocation = null;
         setDaemon(true);
     }
 
@@ -211,29 +210,6 @@
     // Implementation methods
     // -------------------------------------------------------------------------
 
-    /**
-     * Factory method to create a new socket
-     * 
-     * @param remoteLocation
-     * @param localLocation ignored if null
-     * @return
-     * @throws IOException
-     * @throws IOException
-     * @throws UnknownHostException
-     */
-    protected Socket createSocket(SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws IOException, UnknownHostException {
-    	
-        String host = resolveHostName(remoteLocation.getHost());
-        remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
-        
-        if( localLocation!=null ) {
-        	localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
-        }
-        
-        Socket sock = socketFactory.createSocket();        
-        return sock;
-    }
-
     protected String resolveHostName(String host) throws UnknownHostException {
         String localName = InetAddress.getLocalHost().getHostName();
         if (localName != null && isUseLocalHost()) {
@@ -263,23 +239,33 @@
     }
 
     protected void doStart() throws Exception {
-        initialiseSocket(socket);
-        if( localAddress!=null ) {
+        connect();
+        super.doStart();
+    }
+
+	protected void connect() throws SocketException, IOException {
+		
+		initialiseSocket(socket);
+		
+        if( localLocation!=null ) {
+        	SocketAddress localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
         	socket.bind(localAddress);
-        }
-        if (remoteAddress != null) {
+        }                
+		if( remoteLocation!=null ) {
+			String host = resolveHostName(remoteLocation.getHost());
+	        InetSocketAddress remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());        
             if (connectionTimeout >= 0) {
                 socket.connect(remoteAddress, connectionTimeout);
             }
             else {
                 socket.connect(remoteAddress);
             }
-        }
+		}
+        
         initializeStreams();
-        super.doStart();
-    }
+	}
 
-    protected void doStop(ServiceStopper stopper) throws Exception {
+    protected void doStop(ServiceStopper stopper) throws Exception {    	
         closeStreams();
         if (socket != null) {
             socket.close();
@@ -303,7 +289,7 @@
     }
 
     public void setSocketOptions(Map socketOptions) {
-        IntrospectionSupport.setProperties(socket, socketOptions);
+    	IntrospectionSupport.setProperties(socket, socketOptions);
     }
 
 	public String getRemoteAddress() {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java?rev=421811&r1=421810&r2=421811&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java Thu Jul 13 22:54:13 2006
@@ -52,6 +52,7 @@
             IntrospectionSupport.setProperties(server, options);
             Map transportOptions = IntrospectionSupport.extractProperties(options, "transport.");
             server.setTransportOption(transportOptions);
+            server.bind();
             
             return server;
         }
@@ -125,7 +126,7 @@
      * @throws UnknownHostException
      * @throws IOException
      */
-	private TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
+	protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
 		return new TcpTransport(wf, socketFactory, location, localLocation);
 	}
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=421811&r1=421810&r2=421811&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Thu Jul 13 22:54:13 2006
@@ -35,6 +35,7 @@
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportServer;
 import org.apache.activemq.transport.TransportServerThreadSupport;
+import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -48,24 +49,47 @@
  */
 
 public class TcpTransportServer extends TransportServerThreadSupport {
+	
     private static final Log log = LogFactory.getLog(TcpTransportServer.class);
-    private ServerSocket serverSocket;
-    private int backlog = 5000;
-    private WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
-    private final TcpTransportFactory transportFactory;
-    private long maxInactivityDuration = 30000;
-    private int minmumWireFormatVersion;
-    private boolean trace;
-    private Map transportOptions;
+    protected ServerSocket serverSocket;
+    protected int backlog = 5000;
+    protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
+    protected final TcpTransportFactory transportFactory;
+    protected long maxInactivityDuration = 30000;
+    protected int minmumWireFormatVersion;
+    protected boolean trace;
+    protected Map transportOptions;
+    protected final ServerSocketFactory serverSocketFactory;
     
     public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
         super(location);
         this.transportFactory=transportFactory;
-        this.serverSocket = createServerSocket(location, serverSocketFactory);
-        this.serverSocket.setSoTimeout(2000);
-        updatePhysicalUri(location);
+		this.serverSocketFactory = serverSocketFactory;
     }
 
+    public void bind() throws IOException {
+    	URI bind = getBindLocation();
+    	
+        String host = bind.getHost();
+        host = (host == null || host.length() == 0) ? "localhost" : host;
+        InetAddress addr = InetAddress.getByName(host);
+        
+        if (host.trim().equals("localhost") || addr.equals(InetAddress.getLocalHost())) {
+        	this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog);
+        }
+        else {
+        	this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
+        }
+        this.serverSocket.setSoTimeout(2000);
+        
+        try {
+			setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(bind.getHost()), serverSocket.getLocalPort(), bind.getPath(),
+					bind.getQuery(), bind.getFragment()));
+		} catch (URISyntaxException e) {
+			throw IOExceptionSupport.create(e);
+		}
+    }
+    
     /**
      * @return Returns the wireFormatFactory.
      */
@@ -168,19 +192,7 @@
      * @return pretty print of this
      */
     public String toString() {
-        return ""+getLocation();
-    }
-
-    /**
-     * In cases where we construct ourselves with a zero port we need to
-     * regenerate the URI with the real physical port so that people can connect
-     * to us via discovery
-     * 
-     * @throws UnknownHostException
-     */
-    protected void updatePhysicalUri(URI bindAddr) throws URISyntaxException, UnknownHostException {
-        setLocation(new URI(bindAddr.getScheme(), bindAddr.getUserInfo(), resolveHostName(bindAddr.getHost()), serverSocket.getLocalPort(), bindAddr.getPath(),
-                bindAddr.getQuery(), bindAddr.getFragment()));
+        return ""+getBindLocation();
     }
 
     /**
@@ -196,26 +208,6 @@
             result = InetAddress.getLocalHost().getHostName();
         }
         return result;
-    }
-
-    /**
-     * Factory method to create a new ServerSocket
-     * 
-     * @throws UnknownHostException
-     * @throws IOException
-     */
-    protected ServerSocket createServerSocket(URI bind, ServerSocketFactory factory) throws UnknownHostException, IOException {
-        ServerSocket answer = null;
-        String host = bind.getHost();
-        host = (host == null || host.length() == 0) ? "localhost" : host;
-        InetAddress addr = InetAddress.getByName(host);
-        if (host.trim().equals("localhost") || addr.equals(InetAddress.getLocalHost())) {
-            answer = factory.createServerSocket(bind.getPort(), backlog);
-        }
-        else {
-            answer = factory.createServerSocket(bind.getPort(), backlog, addr);
-        }
-        return answer;
     }
 
     protected void doStop(ServiceStopper stopper) throws Exception {