You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/07/02 06:08:53 UTC

svn commit: r418548 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/ main/java/org/apache/activemq/transport/tcp/ test/java/org/apache/activemq/transport/tcp/

Author: chirino
Date: Sat Jul  1 21:08:53 2006
New Revision: 418548

URL: http://svn.apache.org/viewvc?rev=418548&view=rev
Log:
Cleaned up TCP transport a little.

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java?rev=418548&r1=418547&r2=418548&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java Sat Jul  1 21:08:53 2006
@@ -198,13 +198,34 @@
         return "default";
     }
 
-    protected Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
-        IntrospectionSupport.setProperties(transport, options);
+    /**
+     * Fully configures and adds all need transport filters so that the transport
+     * can be used by the JMS client.
+     * 
+     * @param transport
+     * @param wf
+     * @param options
+     * @return
+     * @throws Exception
+     */
+    public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
+    	transport = compositeConfigure(transport, wf, options);
+    	
         transport = new MutexTransport(transport);
         transport = new ResponseCorrelator(transport);
+        
         return transport;
     }
 
+    /**
+     * Similar to configure(...) but this avoid adding in the MutexTransport and ResponseCorrelator transport layers
+     * so that the resulting transport can more efficiently be used as part of a composite transport.
+     * 
+     * @param transport
+     * @param format
+     * @param options
+     * @return
+     */
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
         IntrospectionSupport.setProperties(transport, options);
         return transport;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=418548&r1=418547&r2=418548&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java Sat Jul  1 21:08:53 2006
@@ -21,7 +21,7 @@
  */
 public class TransportFilter implements TransportListener,Transport{
     final protected Transport next;
-    private TransportListener transportListener;
+    protected TransportListener transportListener;
 
     public TransportFilter(Transport next){
         this.next=next;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=418548&r1=418547&r2=418548&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Sat Jul  1 21:08:53 2006
@@ -23,13 +23,14 @@
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.net.SocketAddress;
 import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.net.UnknownHostException;
 import java.util.Map;
 
+import javax.net.SocketFactory;
+
 import org.apache.activeio.command.WireFormat;
 import org.apache.activemq.Service;
 import org.apache.activemq.command.Command;
@@ -40,8 +41,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import javax.net.SocketFactory;
-
 /**
  * An implementation of the {@link Transport} interface using raw tcp/ip
  * 
@@ -60,10 +59,8 @@
     private boolean trace;
     private boolean useLocalHost = true;
     private int minmumWireFormatVersion;
-    private InetSocketAddress socketAddress;
-
-    private Map socketOptions;
-
+    private InetSocketAddress remoteAddress;
+	private InetSocketAddress localAddress;
     
     /**
      * Construct basic helpers
@@ -78,19 +75,6 @@
      * Connect to a remote Node - e.g. a Broker
      * 
      * @param wireFormat
-     * @param remoteLocation
-     * @throws IOException
-     * @throws UnknownHostException
-     */
-    public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation) throws UnknownHostException, IOException {
-        this(wireFormat);
-        this.socket = createSocket(socketFactory, remoteLocation);
-    }
-
-    /**
-     * Connect to a remote Node - e.g. a Broker
-     * 
-     * @param wireFormat
      * @param socketFactory 
      * @param remoteLocation
      * @param localLocation -
@@ -231,36 +215,22 @@
      * Factory method to create a new socket
      * 
      * @param remoteLocation
-     *            the URI to connect to
-     * @return the newly created socket
-     * @throws UnknownHostException
-     * @throws IOException
-     */
-    protected Socket createSocket(SocketFactory socketFactory, URI remoteLocation) throws UnknownHostException, IOException {
-        String host = resolveHostName(remoteLocation.getHost());
-        socketAddress = new InetSocketAddress(host, remoteLocation.getPort());
-        Socket sock = socketFactory.createSocket();
-        return sock;
-    }
-
-    /**
-     * Factory method to create a new socket
-     * 
-     * @param remoteLocation
-     * @param localLocation
+     * @param localLocation ignored if null
      * @return
      * @throws IOException
      * @throws IOException
      * @throws UnknownHostException
      */
     protected Socket createSocket(SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws IOException, UnknownHostException {
+    	
         String host = resolveHostName(remoteLocation.getHost());
-        SocketAddress sockAddress = new InetSocketAddress(host, remoteLocation.getPort());
-        SocketAddress localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
-        Socket sock = socketFactory.createSocket();
-        initialiseSocket(sock);
-        sock.bind(localAddress);
-        sock.connect(sockAddress);
+        remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
+        
+        if( localLocation!=null ) {
+        	localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
+        }
+        
+        Socket sock = socketFactory.createSocket();        
         return sock;
     }
 
@@ -293,12 +263,15 @@
 
     protected void doStart() throws Exception {
         initialiseSocket(socket);
-        if (socketAddress != null) {
+        if( localAddress!=null ) {
+        	socket.bind(localAddress);
+        }
+        if (remoteAddress != null) {
             if (connectionTimeout >= 0) {
-                socket.connect(socketAddress, connectionTimeout);
+                socket.connect(remoteAddress, connectionTimeout);
             }
             else {
-                socket.connect(socketAddress);
+                socket.connect(remoteAddress);
             }
         }
         initializeStreams();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java?rev=418548&r1=418547&r2=418548&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java Sat Jul  1 21:08:53 2006
@@ -29,8 +29,6 @@
 import org.apache.activeio.command.WireFormat;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.transport.InactivityMonitor;
-import org.apache.activemq.transport.MutexTransport;
-import org.apache.activemq.transport.ResponseCorrelator;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportLogger;
@@ -49,7 +47,7 @@
             Map options = new HashMap(URISupport.parseParamters(location));
 
             ServerSocketFactory serverSocketFactory = createServerSocketFactory();
-            TcpTransportServer server = new TcpTransportServer(location, serverSocketFactory);
+            TcpTransportServer server = createTcpTransportServer(location, serverSocketFactory);
             server.setWireFormatFactory(createWireFormatFactory(options));
             IntrospectionSupport.setProperties(server, options);
             Map transportOptions = IntrospectionSupport.extractProperties(options, "transport.");
@@ -62,31 +60,24 @@
         }
     }
 
-    public Transport configure(Transport transport, WireFormat format, Map options) {
-        IntrospectionSupport.setProperties(transport, options);
-        TcpTransport tcpTransport = (TcpTransport) transport;
-        Map socketOptions = IntrospectionSupport.extractProperties(options, "socket.");        
-        tcpTransport.setSocketOptions(socketOptions);
-
-        if (tcpTransport.isTrace()) {
-            transport = new TransportLogger(transport);
-        }
-
-        transport = new InactivityMonitor(transport);
-
-        // Only need the OpenWireFormat if using openwire
-        if( format instanceof OpenWireFormat ) {
-        	transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion());
-        }
-        
-        transport = new MutexTransport(transport);
-        transport = new ResponseCorrelator(transport);
-        return transport;
-    }
+    /**
+     * Allows subclasses of TcpTransportFactory to create custom instances of TcpTransportServer.
+     * 
+     * @param location
+     * @param serverSocketFactory
+     * @return
+     * @throws IOException
+     * @throws URISyntaxException
+     */
+	protected TcpTransportServer createTcpTransportServer(final URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+		return new TcpTransportServer(this, location, serverSocketFactory);
+	}
 
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
-        IntrospectionSupport.setProperties(transport, options);
-        TcpTransport tcpTransport = (TcpTransport) transport;
+        
+        TcpTransport tcpTransport = (TcpTransport) transport.narrow(TcpTransport.class);
+        IntrospectionSupport.setProperties(tcpTransport, options);
+        
         Map socketOptions = IntrospectionSupport.extractProperties(options, "socket.");        
         tcpTransport.setSocketOptions(socketOptions);
 
@@ -96,7 +87,7 @@
 
         transport = new InactivityMonitor(transport);
 
-        // Only need the OpenWireFormat if using openwire
+        // Only need the WireFormatNegotiator if using openwire
         if( format instanceof OpenWireFormat ) {
         	transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion());
         }
@@ -120,11 +111,23 @@
             }
         }
         SocketFactory socketFactory = createSocketFactory();
-        if (localLocation != null) {
-            return new TcpTransport(wf, socketFactory, location, localLocation);
-        }
-        return new TcpTransport(wf, socketFactory, location);
+        return createTcpTransport(wf, socketFactory, location, localLocation);
     }
+
+    /**
+     * Allows subclasses of TcpTransportFactory to provide a create custom TcpTransport intances. 
+     * 
+     * @param location
+     * @param wf
+     * @param socketFactory
+     * @param localLocation 
+     * @return
+     * @throws UnknownHostException
+     * @throws IOException
+     */
+	private TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
+		return new TcpTransport(wf, socketFactory, location, localLocation);
+	}
 
     protected ServerSocketFactory createServerSocketFactory() {
         return ServerSocketFactory.getDefault();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=418548&r1=418547&r2=418548&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Sat Jul  1 21:08:53 2006
@@ -52,16 +52,17 @@
     private ServerSocket serverSocket;
     private int backlog = 5000;
     private WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
-    private TcpTransportFactory transportFactory = new TcpTransportFactory();
+    private final TcpTransportFactory transportFactory;
     private long maxInactivityDuration = 30000;
     private int minmumWireFormatVersion;
     private boolean trace;
     private Map transportOptions;
     
-    public TcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+    public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
         super(location);
-        serverSocket = createServerSocket(location, serverSocketFactory);
-        serverSocket.setSoTimeout(2000);
+        this.transportFactory=transportFactory;
+        this.serverSocket = createServerSocket(location, serverSocketFactory);
+        this.serverSocket.setSoTimeout(2000);
         updatePhysicalUri(location);
     }
 
@@ -132,7 +133,7 @@
                         options.put("trace", new Boolean(trace));
                         options.putAll(transportOptions);
                         WireFormat format = wireFormatFactory.createWireFormat();
-                        TcpTransport transport = new TcpTransport(format, socket);
+                        Transport transport = createTransport(socket, format);
                         Transport configuredTransport = transportFactory.configure(transport, format, options);
                         getAcceptListener().onAccept(configuredTransport);
                     }
@@ -151,6 +152,17 @@
             }
         }
     }
+
+    /**
+     * Allow derived classes to override the Transport implementation that this transport server creates.
+     * @param socket
+     * @param format
+     * @return
+     * @throws IOException
+     */
+	protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
+		return new TcpTransport(format, socket);
+	}
 
     /**
      * @return pretty print of this

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java?rev=418548&r1=418547&r2=418548&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java Sat Jul  1 21:08:53 2006
@@ -148,7 +148,7 @@
         // 
         // Manually create a client transport so that it does not send KeepAlive packets.
         // this should simulate a client hang.
-        clientTransport = new TcpTransport(new OpenWireFormat(), SocketFactory.getDefault(), new URI("tcp://localhost:61616"));
+        clientTransport = new TcpTransport(new OpenWireFormat(), SocketFactory.getDefault(), new URI("tcp://localhost:61616"), null);
         clientTransport.setTransportListener(new TransportListener() {
             public void onCommand(Command command) {
                 clientReceiveCount.incrementAndGet();