You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by ch...@apache.org on 2003/10/21 16:24:40 UTC
cvs commit: incubator-geronimo/modules/core/src/java/org/apache/geronimo/remoting/transport/async/nio NonBlockingChannel.java NonBlockingServer.java
chirino 2003/10/21 07:24:40
Modified: modules/core/src/java/org/apache/geronimo/remoting/transport/async
AbstractServer.java Channel.java ChannelPool.java
modules/core/src/java/org/apache/geronimo/remoting/transport/async/bio
BlockingChannel.java BlockingServer.java
modules/core/src/java/org/apache/geronimo/remoting/transport/async/nio
NonBlockingChannel.java NonBlockingServer.java
Log:
Fix so that the async transport does not hang when a server establishes a backchannel to the client.
Revision Changes Path
1.3 +2 -1 incubator-geronimo/modules/core/src/java/org/apache/geronimo/remoting/transport/async/AbstractServer.java
Index: AbstractServer.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/modules/core/src/java/org/apache/geronimo/remoting/transport/async/AbstractServer.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- AbstractServer.java 29 Aug 2003 19:16:53 -0000 1.2
+++ AbstractServer.java 21 Oct 2003 14:24:39 -0000 1.3
@@ -81,6 +81,7 @@
Long.parseLong(System.getProperty("org.apache.geronimo.remoting.transport.async.connection_timeout", "300000"));
// 5 min.
+ private HashMap uriTo= new HashMap();
private HashMap channelPools = new HashMap();
/**
1.2 +2 -2 incubator-geronimo/modules/core/src/java/org/apache/geronimo/remoting/transport/async/Channel.java
Index: Channel.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/modules/core/src/java/org/apache/geronimo/remoting/transport/async/Channel.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- Channel.java 22 Aug 2003 02:23:26 -0000 1.1
+++ Channel.java 21 Oct 2003 14:24:39 -0000 1.2
@@ -81,7 +81,7 @@
* @throws IOException
* @throws ConnectionFailedException
*/
- public void open(URI uri, ChannelListner listner) throws TransportException;
+ public void open(URI uri, URI backConnectURI, ChannelListner listner) throws TransportException;
/**
* starts an accepted connection.
1.4 +35 -11 incubator-geronimo/modules/core/src/java/org/apache/geronimo/remoting/transport/async/ChannelPool.java
Index: ChannelPool.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/modules/core/src/java/org/apache/geronimo/remoting/transport/async/ChannelPool.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- ChannelPool.java 6 Sep 2003 13:52:11 -0000 1.3
+++ ChannelPool.java 21 Oct 2003 14:24:39 -0000 1.4
@@ -83,7 +83,8 @@
private static final Log log = LogFactory.getLog(ChannelPool.class);
- private final URI uri;
+ private final URI remoteURI;
+ private URI backConnectURI;
private final List available = new ArrayList();
private final Correlator responseManager = new Correlator();
private Router dispatcher;
@@ -96,10 +97,18 @@
* @param uri
*/
public ChannelPool(URI uri, Router dispatcher) {
- this.uri = uri;
+ this.remoteURI = uri;
this.dispatcher = dispatcher;
+ try {
+ if (Registry.instance.getServerForClientRequest() == null) {
+ backConnectURI = new URI("async://localhost:0");
+ } else {
+ backConnectURI = Registry.instance.getServerForClientRequest().getClientConnectURI();
+ }
+ } catch (Exception e) {
+ }
}
-
+
public void dispose() {
Iterator iterator;
synchronized (available) {
@@ -138,9 +147,9 @@
createdChannelCount++;
}
- public void open(URI uri) throws TransportException, TransportException {
+ public void open(URI uri, URI localuri) throws TransportException, TransportException {
try {
- next.open(uri, this);
+ next.open(uri, localuri, this);
} catch (TransportException e) {
doCloseInternal = true;
throw e;
@@ -286,15 +295,15 @@
} while (!maxOpenConnections.attempt(100));
} catch (InterruptedException e1) {
- throw new TransportException("(" + uri + "): " + e1);
+ throw new TransportException("(" + remoteURI + "): " + e1);
}
// not available, make one on demand
try {
- log.debug("channel connecting to: " + uri);
+ log.debug("channel connecting to: " + remoteURI);
PooledAsynchChannel c = new PooledAsynchChannel(TransportFactory.instance.createAsynchChannel());
- c.open(uri);
+ c.open(remoteURI, backConnectURI);
return c;
} catch (Exception e) {
@@ -302,9 +311,9 @@
maxOpenConnections.release();
log.debug("Connect Failed: ", e);
if (log.isDebugEnabled()) {
- log.debug("channel connection to: " + uri + " failed", e);
+ log.debug("channel connection to: " + remoteURI + " failed", e);
}
- throw new TransportException("(" + uri + "): " + e);
+ throw new TransportException("(" + remoteURI + "): " + e);
}
}
@@ -481,6 +490,21 @@
public int getCreatedChannelCount() {
return createdChannelCount;
+ }
+
+
+ /**
+ * @return Returns the backConnectURI.
+ */
+ public URI getBackConnectURI() {
+ return backConnectURI;
+ }
+
+ /**
+ * @param backConnectURI The backConnectURI to set.
+ */
+ public void setBackConnectURI(URI backConnectURI) {
+ this.backConnectURI = backConnectURI;
}
}
1.2 +27 -20 incubator-geronimo/modules/core/src/java/org/apache/geronimo/remoting/transport/async/bio/BlockingChannel.java
Index: BlockingChannel.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/modules/core/src/java/org/apache/geronimo/remoting/transport/async/bio/BlockingChannel.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- BlockingChannel.java 22 Aug 2003 02:23:26 -0000 1.1
+++ BlockingChannel.java 21 Oct 2003 14:24:39 -0000 1.2
@@ -62,7 +62,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.io.StreamCorruptedException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
@@ -84,7 +83,6 @@
import org.apache.geronimo.remoting.transport.async.AsyncMsg;
import org.apache.geronimo.remoting.transport.async.Channel;
import org.apache.geronimo.remoting.transport.async.ChannelListner;
-import org.apache.geronimo.remoting.transport.async.Registry;
/**
* The Blocking implementation of the AsynchChannel interface.
*
@@ -99,13 +97,15 @@
private ChannelListner listner;
private Thread worker;
private SocketChannel socketChannel;
- private URI remoteURI;
private boolean closing = false;
private Inflater inflator;
private Deflater deflater;
- public void open(URI remoteURI, ChannelListner listner) throws TransportException {
+ private URI remoteURI;
+ private URI requestedURI;
+
+ public void open(URI remoteURI, URI backConnectURI, ChannelListner listner) throws TransportException {
if (log.isTraceEnabled())
log.trace("Connecting to : " + remoteURI);
@@ -132,10 +132,7 @@
DataOutputStream out = new DataOutputStream(socketChannel.socket().getOutputStream());
out.writeUTF(remoteURI.toString());
- if (Registry.instance.getServerForClientRequest() == null)
- out.writeUTF("async://" + socketChannel.socket().getLocalAddress().getHostAddress() + ":0");
- else
- out.writeUTF(Registry.instance.getServerForClientRequest().getClientConnectURI().toString());
+ out.writeUTF(backConnectURI.toString());
out.flush();
if (compression != -1) {
@@ -165,15 +162,14 @@
// the source vm. Could be null.
String sourceURI = in.readUTF();
this.remoteURI = new URI(sourceURI);
+ this.requestedURI = new URI(destURI);
if (log.isTraceEnabled()) {
- log.trace("Connected from : " + remoteURI);
- log.trace("Request URI : " + destURI);
+ log.trace("Remote URI : " + remoteURI);
+ log.trace("Requested URI : " + requestedURI);
}
-
- // What options did the client want to use with this connection?
- URI rruri = new URI(destURI);
+
boolean enableTcpNoDelay = true;
- Properties params = URISupport.parseQueryParameters(rruri);
+ Properties params = URISupport.parseQueryParameters(requestedURI);
enableTcpNoDelay = params.getProperty("tcp.nodelay", "true").equals("true");
int compression = Integer.parseInt((String) params.getProperty("compression", "-1"));
@@ -278,8 +274,9 @@
log.trace("Waiting for message");
message[0].clear();
socketChannel.read(message[0]);
- if( message[0].position()!=4 )
- throw new StreamCorruptedException("Did not receive the full message header.");
+ while( message[0].position()!=4 ) {
+ socketChannel.read(message[0]);
+ }
message[0].flip();
int size = message[0].getInt();
@@ -296,13 +293,15 @@
message[1].clear();
message[1].limit(size);
socketChannel.read(message[1]);
- if( message[1].position()!=size )
- throw new StreamCorruptedException("Did not receive the full message body.");
+ while( message[1].position()!=size ) {
+ socketChannel.read(message[1]);
+ }
message[1].flip();
listner.receiveEvent(deserialize(message));
}
+ log.trace("Stopping due to remote end closing.");
} catch (IOException e) {
- // The remote end died on us.
+ log.trace("Stopping due to exception.", e);
} finally {
asyncClose();
}
@@ -393,4 +392,12 @@
public URI getRemoteURI() {
return remoteURI;
}
+
+ /**
+ * @return Returns the requestedURI.
+ */
+ public URI getRequestedURI() {
+ return requestedURI;
+ }
+
}
1.4 +4 -1 incubator-geronimo/modules/core/src/java/org/apache/geronimo/remoting/transport/async/bio/BlockingServer.java
Index: BlockingServer.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/modules/core/src/java/org/apache/geronimo/remoting/transport/async/bio/BlockingServer.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- BlockingServer.java 30 Aug 2003 14:49:25 -0000 1.3
+++ BlockingServer.java 21 Oct 2003 14:24:39 -0000 1.4
@@ -188,11 +188,14 @@
continue;
}
try {
+
socket.socket().setTcpNoDelay(enableTcpNoDelay);
BlockingChannel channel = new BlockingChannel();
channel.init(connectURI, socket);
ChannelPool pool = getChannelPool(channel.getRemoteURI());
+ pool.setBackConnectURI( channel.getRequestedURI() );
pool.associate(channel);
+
} catch (TransportException ie) {
log.debug("Client connection could not be accepted: ", ie);
} catch (IOException ie) {
1.2 +18 -7 incubator-geronimo/modules/core/src/java/org/apache/geronimo/remoting/transport/async/nio/NonBlockingChannel.java
Index: NonBlockingChannel.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/modules/core/src/java/org/apache/geronimo/remoting/transport/async/nio/NonBlockingChannel.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- NonBlockingChannel.java 22 Aug 2003 02:23:27 -0000 1.1
+++ NonBlockingChannel.java 21 Oct 2003 14:24:39 -0000 1.2
@@ -84,7 +84,6 @@
import org.apache.geronimo.remoting.transport.async.AsyncMsg;
import org.apache.geronimo.remoting.transport.async.Channel;
import org.apache.geronimo.remoting.transport.async.ChannelListner;
-import org.apache.geronimo.remoting.transport.async.Registry;
import EDU.oswego.cs.dl.util.concurrent.Mutex;
/**
@@ -110,7 +109,9 @@
private SelectorManager selectorManager;
private SelectionKey selectionKey;
- public void open(URI remoteURI, ChannelListner listner) throws TransportException {
+ private URI requestedURI;
+
+ public void open(URI remoteURI, URI backConnectURI, ChannelListner listner) throws TransportException {
if (log.isTraceEnabled())
log.trace("Connecting to : " + remoteURI);
@@ -137,10 +138,13 @@
DataOutputStream out = new DataOutputStream(socketChannel.socket().getOutputStream());
out.writeUTF(remoteURI.toString());
+ out.writeUTF(backConnectURI.toString());
+ /*
if (Registry.instance.getServerForClientRequest() == null)
out.writeUTF("async://" + socketChannel.socket().getLocalAddress().getHostAddress() + ":0");
else
out.writeUTF(Registry.instance.getServerForClientRequest().getClientConnectURI().toString());
+ */
out.flush();
if (compression != -1) {
@@ -173,15 +177,15 @@
// the source vm. Could be null.
String sourceURI = in.readUTF();
this.remoteURI = new URI(sourceURI);
+ this.requestedURI = new URI(destURI);
if (log.isTraceEnabled()) {
- log.trace("Connected from : " + remoteURI);
- log.trace("Request URI : " + destURI);
+ log.trace("Remote URI : " + remoteURI);
+ log.trace("Requested URI : " + requestedURI);
}
// What options did the client want to use with this connection?
- URI rruri = new URI(destURI);
boolean enableTcpNoDelay = true;
- Properties params = URISupport.parseQueryParameters(rruri);
+ Properties params = URISupport.parseQueryParameters(requestedURI);
enableTcpNoDelay = params.getProperty("tcp.nodelay", "true").equals("true");
int compression = Integer.parseInt((String) params.getProperty("compression", "-1"));
@@ -468,6 +472,13 @@
log.debug("Communications error, closing connection: ", e);
asyncClose();
}
+ }
+
+ /**
+ * @return
+ */
+ public URI getRequestedURI() {
+ return requestedURI;
}
}
1.4 +2 -1 incubator-geronimo/modules/core/src/java/org/apache/geronimo/remoting/transport/async/nio/NonBlockingServer.java
Index: NonBlockingServer.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/modules/core/src/java/org/apache/geronimo/remoting/transport/async/nio/NonBlockingServer.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- NonBlockingServer.java 30 Aug 2003 14:49:25 -0000 1.3
+++ NonBlockingServer.java 21 Oct 2003 14:24:39 -0000 1.4
@@ -192,6 +192,7 @@
NonBlockingChannel channel = new NonBlockingChannel();
channel.init(connectURI, socketChannel);
ChannelPool pool = getChannelPool(channel.getRemoteURI());
+ pool.setBackConnectURI( channel.getRequestedURI() );
pool.associate(channel);
} catch (TransportException ie) {
log.debug("Client connection could not be accepted: ", ie);