You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/07/04 17:01:23 UTC
svn commit: r419024 - in
/incubator/activemq/branches/activemq-4.0/activemq-core/src:
main/java/org/apache/activemq/transport/
main/java/org/apache/activemq/transport/tcp/
test/java/org/apache/activemq/transport/stomp/
test/java/org/apache/activemq/tra...
Author: chirino
Date: Tue Jul 4 08:01:22 2006
New Revision: 419024
URL: http://svn.apache.org/viewvc?rev=419024&view=rev
Log:
Merged in revision 418548 from trunk since that was needed for the stomp backport
http://issues.apache.org/activemq/browse/AMQ-793
Removed:
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompWireFormatTest.java
Modified:
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java?rev=419024&r1=419023&r2=419024&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java Tue Jul 4 08:01:22 2006
@@ -198,13 +198,34 @@
return "default";
}
- protected Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
- IntrospectionSupport.setProperties(transport, options);
+ /**
+ * Fully configures and adds all need transport filters so that the transport
+ * can be used by the JMS client.
+ *
+ * @param transport
+ * @param wf
+ * @param options
+ * @return
+ * @throws Exception
+ */
+ public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
+ transport = compositeConfigure(transport, wf, options);
+
transport = new MutexTransport(transport);
transport = new ResponseCorrelator(transport);
+
return transport;
}
+ /**
+ * Similar to configure(...) but this avoid adding in the MutexTransport and ResponseCorrelator transport layers
+ * so that the resulting transport can more efficiently be used as part of a composite transport.
+ *
+ * @param transport
+ * @param format
+ * @param options
+ * @return
+ */
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
IntrospectionSupport.setProperties(transport, options);
return transport;
Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=419024&r1=419023&r2=419024&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java Tue Jul 4 08:01:22 2006
@@ -21,7 +21,7 @@
*/
public class TransportFilter implements TransportListener,Transport{
final protected Transport next;
- private TransportListener transportListener;
+ protected TransportListener transportListener;
public TransportFilter(Transport next){
this.next=next;
Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=419024&r1=419023&r2=419024&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Tue Jul 4 08:01:22 2006
@@ -23,13 +23,14 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
-import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.Map;
+import javax.net.SocketFactory;
+
import org.apache.activeio.command.WireFormat;
import org.apache.activemq.Service;
import org.apache.activemq.command.Command;
@@ -40,8 +41,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import javax.net.SocketFactory;
-
/**
* An implementation of the {@link Transport} interface using raw tcp/ip
*
@@ -60,10 +59,8 @@
private boolean trace;
private boolean useLocalHost = true;
private int minmumWireFormatVersion;
- private InetSocketAddress socketAddress;
-
- private Map socketOptions;
-
+ private InetSocketAddress remoteAddress;
+ private InetSocketAddress localAddress;
/**
* Construct basic helpers
@@ -78,19 +75,6 @@
* Connect to a remote Node - e.g. a Broker
*
* @param wireFormat
- * @param remoteLocation
- * @throws IOException
- * @throws UnknownHostException
- */
- public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation) throws UnknownHostException, IOException {
- this(wireFormat);
- this.socket = createSocket(socketFactory, remoteLocation);
- }
-
- /**
- * Connect to a remote Node - e.g. a Broker
- *
- * @param wireFormat
* @param socketFactory
* @param remoteLocation
* @param localLocation -
@@ -231,36 +215,22 @@
* Factory method to create a new socket
*
* @param remoteLocation
- * the URI to connect to
- * @return the newly created socket
- * @throws UnknownHostException
- * @throws IOException
- */
- protected Socket createSocket(SocketFactory socketFactory, URI remoteLocation) throws UnknownHostException, IOException {
- String host = resolveHostName(remoteLocation.getHost());
- socketAddress = new InetSocketAddress(host, remoteLocation.getPort());
- Socket sock = socketFactory.createSocket();
- return sock;
- }
-
- /**
- * Factory method to create a new socket
- *
- * @param remoteLocation
- * @param localLocation
+ * @param localLocation ignored if null
* @return
* @throws IOException
* @throws IOException
* @throws UnknownHostException
*/
protected Socket createSocket(SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws IOException, UnknownHostException {
+
String host = resolveHostName(remoteLocation.getHost());
- SocketAddress sockAddress = new InetSocketAddress(host, remoteLocation.getPort());
- SocketAddress localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
- Socket sock = socketFactory.createSocket();
- initialiseSocket(sock);
- sock.bind(localAddress);
- sock.connect(sockAddress);
+ remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
+
+ if( localLocation!=null ) {
+ localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
+ }
+
+ Socket sock = socketFactory.createSocket();
return sock;
}
@@ -293,12 +263,15 @@
protected void doStart() throws Exception {
initialiseSocket(socket);
- if (socketAddress != null) {
+ if( localAddress!=null ) {
+ socket.bind(localAddress);
+ }
+ if (remoteAddress != null) {
if (connectionTimeout >= 0) {
- socket.connect(socketAddress, connectionTimeout);
+ socket.connect(remoteAddress, connectionTimeout);
}
else {
- socket.connect(socketAddress);
+ socket.connect(remoteAddress);
}
}
initializeStreams();
Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java?rev=419024&r1=419023&r2=419024&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java Tue Jul 4 08:01:22 2006
@@ -29,8 +29,6 @@
import org.apache.activeio.command.WireFormat;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.InactivityMonitor;
-import org.apache.activemq.transport.MutexTransport;
-import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportLogger;
@@ -49,7 +47,7 @@
Map options = new HashMap(URISupport.parseParamters(location));
ServerSocketFactory serverSocketFactory = createServerSocketFactory();
- TcpTransportServer server = new TcpTransportServer(location, serverSocketFactory);
+ TcpTransportServer server = createTcpTransportServer(location, serverSocketFactory);
server.setWireFormatFactory(createWireFormatFactory(options));
IntrospectionSupport.setProperties(server, options);
Map transportOptions = IntrospectionSupport.extractProperties(options, "transport.");
@@ -62,31 +60,24 @@
}
}
- public Transport configure(Transport transport, WireFormat format, Map options) {
- IntrospectionSupport.setProperties(transport, options);
- TcpTransport tcpTransport = (TcpTransport) transport;
- Map socketOptions = IntrospectionSupport.extractProperties(options, "socket.");
- tcpTransport.setSocketOptions(socketOptions);
-
- if (tcpTransport.isTrace()) {
- transport = new TransportLogger(transport);
- }
-
- transport = new InactivityMonitor(transport);
-
- // Only need the OpenWireFormat if using openwire
- if( format instanceof OpenWireFormat ) {
- transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion());
- }
-
- transport = new MutexTransport(transport);
- transport = new ResponseCorrelator(transport);
- return transport;
- }
+ /**
+ * Allows subclasses of TcpTransportFactory to create custom instances of TcpTransportServer.
+ *
+ * @param location
+ * @param serverSocketFactory
+ * @return
+ * @throws IOException
+ * @throws URISyntaxException
+ */
+ protected TcpTransportServer createTcpTransportServer(final URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+ return new TcpTransportServer(this, location, serverSocketFactory);
+ }
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
- IntrospectionSupport.setProperties(transport, options);
- TcpTransport tcpTransport = (TcpTransport) transport;
+
+ TcpTransport tcpTransport = (TcpTransport) transport.narrow(TcpTransport.class);
+ IntrospectionSupport.setProperties(tcpTransport, options);
+
Map socketOptions = IntrospectionSupport.extractProperties(options, "socket.");
tcpTransport.setSocketOptions(socketOptions);
@@ -96,7 +87,7 @@
transport = new InactivityMonitor(transport);
- // Only need the OpenWireFormat if using openwire
+ // Only need the WireFormatNegotiator if using openwire
if( format instanceof OpenWireFormat ) {
transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion());
}
@@ -120,11 +111,23 @@
}
}
SocketFactory socketFactory = createSocketFactory();
- if (localLocation != null) {
- return new TcpTransport(wf, socketFactory, location, localLocation);
- }
- return new TcpTransport(wf, socketFactory, location);
+ return createTcpTransport(wf, socketFactory, location, localLocation);
}
+
+ /**
+ * Allows subclasses of TcpTransportFactory to provide a create custom TcpTransport intances.
+ *
+ * @param location
+ * @param wf
+ * @param socketFactory
+ * @param localLocation
+ * @return
+ * @throws UnknownHostException
+ * @throws IOException
+ */
+ private TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
+ return new TcpTransport(wf, socketFactory, location, localLocation);
+ }
protected ServerSocketFactory createServerSocketFactory() {
return ServerSocketFactory.getDefault();
Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=419024&r1=419023&r2=419024&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Tue Jul 4 08:01:22 2006
@@ -52,16 +52,17 @@
private ServerSocket serverSocket;
private int backlog = 5000;
private WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
- private TcpTransportFactory transportFactory = new TcpTransportFactory();
+ private final TcpTransportFactory transportFactory;
private long maxInactivityDuration = 30000;
private int minmumWireFormatVersion;
private boolean trace;
private Map transportOptions;
- public TcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+ public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
super(location);
- serverSocket = createServerSocket(location, serverSocketFactory);
- serverSocket.setSoTimeout(2000);
+ this.transportFactory=transportFactory;
+ this.serverSocket = createServerSocket(location, serverSocketFactory);
+ this.serverSocket.setSoTimeout(2000);
updatePhysicalUri(location);
}
@@ -132,7 +133,7 @@
options.put("trace", new Boolean(trace));
options.putAll(transportOptions);
WireFormat format = wireFormatFactory.createWireFormat();
- TcpTransport transport = new TcpTransport(format, socket);
+ Transport transport = createTransport(socket, format);
Transport configuredTransport = transportFactory.configure(transport, format, options);
getAcceptListener().onAccept(configuredTransport);
}
@@ -151,6 +152,17 @@
}
}
}
+
+ /**
+ * Allow derived classes to override the Transport implementation that this transport server creates.
+ * @param socket
+ * @param format
+ * @return
+ * @throws IOException
+ */
+ protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
+ return new TcpTransport(format, socket);
+ }
/**
* @return pretty print of this
Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java?rev=419024&r1=419023&r2=419024&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java Tue Jul 4 08:01:22 2006
@@ -148,7 +148,7 @@
//
// Manually create a client transport so that it does not send KeepAlive packets.
// this should simulate a client hang.
- clientTransport = new TcpTransport(new OpenWireFormat(), SocketFactory.getDefault(), new URI("tcp://localhost:61616"));
+ clientTransport = new TcpTransport(new OpenWireFormat(), SocketFactory.getDefault(), new URI("tcp://localhost:61616"), null);
clientTransport.setTransportListener(new TransportListener() {
public void onCommand(Command command) {
clientReceiveCount.incrementAndGet();